4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
25 * lustre/mdt/out_handler.c
27 * Object update handler between targets.
29 * Author: di.wang <di.wang@intel.com>
32 #define DEBUG_SUBSYSTEM S_CLASS
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40 tx_exec_func_t undo, char *file, int line)
48 LASSERT(i < UPDATE_MAX_OPS);
52 ta->ta_args[i].exec_fn = func;
53 ta->ta_args[i].undo_fn = undo;
54 ta->ta_args[i].file = file;
55 ta->ta_args[i].line = line;
57 return &ta->ta_args[i];
60 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
61 struct thandle_exec_args *ta, struct obd_export *exp)
63 memset(ta, 0, sizeof(*ta));
64 ta->ta_handle = dt_trans_create(env, dt);
65 if (IS_ERR(ta->ta_handle)) {
66 CERROR("%s: start handle error: rc = %ld\n",
67 dt_obd_name(dt), PTR_ERR(ta->ta_handle));
68 return PTR_ERR(ta->ta_handle);
71 if (exp->exp_need_sync)
72 ta->ta_handle->th_sync = 1;
77 static int out_trans_start(const struct lu_env *env,
78 struct thandle_exec_args *ta)
80 return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
83 static int out_trans_stop(const struct lu_env *env,
84 struct thandle_exec_args *ta, int err)
89 ta->ta_handle->th_result = err;
90 rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
91 for (i = 0; i < ta->ta_argno; i++) {
92 if (ta->ta_args[i].object != NULL) {
93 lu_object_put(env, &ta->ta_args[i].object->do_lu);
94 ta->ta_args[i].object = NULL;
101 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
103 struct tgt_session_info *tsi = tgt_ses_info(env);
107 LASSERT(ta->ta_handle);
109 if (ta->ta_err != 0 || ta->ta_argno == 0)
110 GOTO(stop, rc = ta->ta_err);
112 rc = out_trans_start(env, ta);
116 for (i = 0; i < ta->ta_argno; i++) {
117 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
120 CDEBUG(D_INFO, "error during execution of #%u from"
121 " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
122 ta->ta_args[i].line, rc);
124 LASSERTF(ta->ta_args[i].undo_fn != NULL,
125 "can't undo changes, hope for failover!\n");
126 ta->ta_args[i].undo_fn(env, ta->ta_handle,
133 /* Only fail for real update */
134 tsi->tsi_reply_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
136 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
137 dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
138 out_trans_stop(env, ta, rc);
139 ta->ta_handle = NULL;
146 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
147 struct dt_object *obj, struct update_reply *reply,
150 CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
151 dt_obd_name(dt), reply, index, 0);
153 update_insert_reply(reply, NULL, 0, index, 0);
157 typedef void (*out_reconstruct_t)(const struct lu_env *env,
158 struct dt_device *dt,
159 struct dt_object *obj,
160 struct update_reply *reply,
163 static inline int out_check_resent(const struct lu_env *env,
164 struct dt_device *dt,
165 struct dt_object *obj,
166 struct ptlrpc_request *req,
167 out_reconstruct_t reconstruct,
168 struct update_reply *reply,
171 if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
174 if (req_xid_is_last(req)) {
175 reconstruct(env, dt, obj, reply, index);
178 DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
179 req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
183 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
188 CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
189 PFID(lu_object_fid(&dt_obj->do_lu)));
191 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
192 rc = dt_destroy(env, dt_obj, th);
193 dt_write_unlock(env, dt_obj);
199 * All of the xxx_undo will be used once execution failed,
200 * But because all of the required resource has been reserved in
201 * declare phase, i.e. if declare succeed, it should make sure
202 * the following executing phase succeed in anyway, so these undo
203 * should be useless for most of the time in Phase I
205 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
210 rc = out_obj_destroy(env, arg->object, th);
212 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
213 dt_obd_name(th->th_dev), rc);
217 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
220 struct dt_object *dt_obj = arg->object;
223 CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
224 dt_obd_name(th->th_dev),
225 PFID(lu_object_fid(&arg->object->do_lu)),
226 arg->u.create.dof.dof_type,
227 arg->u.create.attr.la_mode & S_IFMT);
229 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
230 rc = dt_create(env, dt_obj, &arg->u.create.attr,
231 &arg->u.create.hint, &arg->u.create.dof, th);
233 dt_write_unlock(env, dt_obj);
235 CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
236 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
238 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
243 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
244 struct lu_attr *attr, struct lu_fid *parent_fid,
245 struct dt_object_format *dof,
246 struct thandle_exec_args *ta,
247 struct update_reply *reply,
248 int index, char *file, int line)
252 LASSERT(ta->ta_handle != NULL);
253 ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
258 arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
262 /* release the object in out_trans_stop */
263 lu_object_get(&obj->do_lu);
265 arg->u.create.attr = *attr;
267 arg->u.create.fid = *parent_fid;
268 memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
269 arg->u.create.dof = *dof;
276 static int out_create(struct tgt_session_info *tsi)
278 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
279 struct update *update = tti->tti_u.update.tti_update;
280 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
281 struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
282 struct obdo *lobdo = &tti->tti_u.update.tti_obdo;
283 struct lu_attr *attr = &tti->tti_attr;
284 struct lu_fid *fid = NULL;
291 wobdo = update_param_buf(update, 0, &size);
292 if (wobdo == NULL || size != sizeof(*wobdo)) {
293 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
294 tgt_name(tsi->tsi_tgt), -EPROTO);
295 RETURN(err_serious(-EPROTO));
298 obdo_le_to_cpu(wobdo, wobdo);
299 lustre_get_wire_obdo(NULL, lobdo, wobdo);
300 la_from_obdo(attr, lobdo, lobdo->o_valid);
302 dof->dof_type = dt_mode_to_dft(attr->la_mode);
303 if (S_ISDIR(attr->la_mode)) {
306 fid = update_param_buf(update, 1, &size);
307 if (fid == NULL || size != sizeof(*fid)) {
308 CERROR("%s: invalid fid: rc = %d\n",
309 tgt_name(tsi->tsi_tgt), -EPROTO);
310 RETURN(err_serious(-EPROTO));
312 fid_le_to_cpu(fid, fid);
313 if (!fid_is_sane(fid)) {
314 CERROR("%s: invalid fid "DFID": rc = %d\n",
315 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
316 RETURN(err_serious(-EPROTO));
320 if (lu_object_exists(&obj->do_lu))
323 rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
325 tti->tti_u.update.tti_update_reply,
326 tti->tti_u.update.tti_update_reply_index);
331 static int out_tx_attr_set_undo(const struct lu_env *env,
332 struct thandle *th, struct tx_arg *arg)
334 CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
335 dt_obd_name(th->th_dev),
336 PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
341 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
344 struct dt_object *dt_obj = arg->object;
347 CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
348 PFID(lu_object_fid(&dt_obj->do_lu)));
350 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
351 rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
352 dt_write_unlock(env, dt_obj);
354 CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
355 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
357 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
362 static int __out_tx_attr_set(const struct lu_env *env,
363 struct dt_object *dt_obj,
364 const struct lu_attr *attr,
365 struct thandle_exec_args *th,
366 struct update_reply *reply, int index,
367 char *file, int line)
371 LASSERT(th->ta_handle != NULL);
372 th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
376 arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
379 lu_object_get(&dt_obj->do_lu);
380 arg->object = dt_obj;
381 arg->u.attr_set.attr = *attr;
387 static int out_attr_set(struct tgt_session_info *tsi)
389 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
390 struct update *update = tti->tti_u.update.tti_update;
391 struct lu_attr *attr = &tti->tti_attr;
392 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
393 struct obdo *lobdo = &tti->tti_u.update.tti_obdo;
400 wobdo = update_param_buf(update, 0, &size);
401 if (wobdo == NULL || size != sizeof(*wobdo)) {
402 CERROR("%s: empty obdo in the update: rc = %d\n",
403 tgt_name(tsi->tsi_tgt), -EPROTO);
404 RETURN(err_serious(-EPROTO));
409 obdo_le_to_cpu(wobdo, wobdo);
410 lustre_get_wire_obdo(NULL, lobdo, wobdo);
411 la_from_obdo(attr, lobdo, lobdo->o_valid);
413 rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
414 tti->tti_u.update.tti_update_reply,
415 tti->tti_u.update.tti_update_reply_index);
420 static int out_attr_get(struct tgt_session_info *tsi)
422 const struct lu_env *env = tsi->tsi_env;
423 struct tgt_thread_info *tti = tgt_th_info(env);
424 struct obdo *obdo = &tti->tti_u.update.tti_obdo;
425 struct lu_attr *la = &tti->tti_attr;
426 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
431 if (!lu_object_exists(&obj->do_lu))
434 dt_read_lock(env, obj, MOR_TGT_CHILD);
435 rc = dt_attr_get(env, obj, la, NULL);
437 GOTO(out_unlock, rc);
439 * If it is a directory, we will also check whether the
440 * directory is empty.
441 * la_flags = 0 : Empty.
445 if (S_ISDIR(la->la_mode)) {
447 const struct dt_it_ops *iops;
449 if (!dt_try_as_dir(env, obj))
450 GOTO(out_unlock, rc = -ENOTDIR);
452 iops = &obj->do_index_ops->dio_it;
453 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
456 result = iops->get(env, it, (const void *)"");
459 for (result = 0, i = 0; result == 0 && i < 3;
461 result = iops->next(env, it);
464 } else if (result == 0)
466 * Huh? Index contains no zero key?
476 obdo_from_la(obdo, la, la->la_valid);
477 obdo_cpu_to_le(obdo, obdo);
478 lustre_set_wire_obdo(NULL, obdo, obdo);
481 dt_read_unlock(env, obj);
483 CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
484 tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
487 update_insert_reply(tti->tti_u.update.tti_update_reply, obdo,
489 tti->tti_u.update.tti_update_reply_index, rc);
493 static int out_xattr_get(struct tgt_session_info *tsi)
495 const struct lu_env *env = tsi->tsi_env;
496 struct tgt_thread_info *tti = tgt_th_info(env);
497 struct update *update = tti->tti_u.update.tti_update;
498 struct lu_buf *lbuf = &tti->tti_buf;
499 struct update_reply *reply = tti->tti_u.update.tti_update_reply;
500 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
503 int idx = tti->tti_u.update.tti_update_reply_index;
508 name = (char *)update_param_buf(update, 0, NULL);
510 CERROR("%s: empty name for xattr get: rc = %d\n",
511 tgt_name(tsi->tsi_tgt), -EPROTO);
512 RETURN(err_serious(-EPROTO));
515 ptr = update_get_buf_internal(reply, idx, NULL);
516 LASSERT(ptr != NULL);
518 /* The first 4 bytes(int) are used to store the result */
519 lbuf->lb_buf = (char *)ptr + sizeof(int);
520 lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
521 dt_read_lock(env, obj, MOR_TGT_CHILD);
522 rc = dt_xattr_get(env, obj, lbuf, name, NULL);
523 dt_read_unlock(env, obj);
530 GOTO(out, rc = -ENOENT);
534 CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
535 tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
536 name, (int)lbuf->lb_len);
542 reply->ur_lens[idx] = lbuf->lb_len + sizeof(int);
547 static int out_index_lookup(struct tgt_session_info *tsi)
549 const struct lu_env *env = tsi->tsi_env;
550 struct tgt_thread_info *tti = tgt_th_info(env);
551 struct update *update = tti->tti_u.update.tti_update;
552 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
558 if (!lu_object_exists(&obj->do_lu))
561 name = (char *)update_param_buf(update, 0, NULL);
563 CERROR("%s: empty name for lookup: rc = %d\n",
564 tgt_name(tsi->tsi_tgt), -EPROTO);
565 RETURN(err_serious(-EPROTO));
568 dt_read_lock(env, obj, MOR_TGT_CHILD);
569 if (!dt_try_as_dir(env, obj))
570 GOTO(out_unlock, rc = -ENOTDIR);
572 rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
573 (struct dt_key *)name, NULL);
576 GOTO(out_unlock, rc);
581 CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
582 PFID(lu_object_fid(&obj->do_lu)), name,
583 PFID(&tti->tti_fid1), rc);
584 fid_cpu_to_le(&tti->tti_fid1, &tti->tti_fid1);
587 dt_read_unlock(env, obj);
589 CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
590 tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
593 update_insert_reply(tti->tti_u.update.tti_update_reply,
594 &tti->tti_fid1, sizeof(tti->tti_fid1),
595 tti->tti_u.update.tti_update_reply_index, rc);
599 static int out_tx_xattr_set_exec(const struct lu_env *env,
603 struct dt_object *dt_obj = arg->object;
606 CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
607 dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
608 arg->u.xattr_set.name, arg->u.xattr_set.flags);
610 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
611 rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
612 arg->u.xattr_set.name, arg->u.xattr_set.flags,
614 dt_write_unlock(env, dt_obj);
616 * Ignore errors if this is LINK EA
618 if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
621 CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
622 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
624 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
629 static int __out_tx_xattr_set(const struct lu_env *env,
630 struct dt_object *dt_obj,
631 const struct lu_buf *buf,
632 const char *name, int flags,
633 struct thandle_exec_args *ta,
634 struct update_reply *reply, int index,
635 char *file, int line)
639 LASSERT(ta->ta_handle != NULL);
640 ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
641 flags, ta->ta_handle);
645 arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
647 lu_object_get(&dt_obj->do_lu);
648 arg->object = dt_obj;
649 arg->u.xattr_set.name = name;
650 arg->u.xattr_set.flags = flags;
651 arg->u.xattr_set.buf = *buf;
654 arg->u.xattr_set.csum = 0;
658 static int out_xattr_set(struct tgt_session_info *tsi)
660 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
661 struct update *update = tti->tti_u.update.tti_update;
662 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
663 struct lu_buf *lbuf = &tti->tti_buf;
672 name = update_param_buf(update, 0, NULL);
674 CERROR("%s: empty name for xattr set: rc = %d\n",
675 tgt_name(tsi->tsi_tgt), -EPROTO);
676 RETURN(err_serious(-EPROTO));
679 buf = (char *)update_param_buf(update, 1, &buf_len);
680 if (buf == NULL || buf_len == 0) {
681 CERROR("%s: empty buf for xattr set: rc = %d\n",
682 tgt_name(tsi->tsi_tgt), -EPROTO);
683 RETURN(err_serious(-EPROTO));
687 lbuf->lb_len = buf_len;
689 tmp = (char *)update_param_buf(update, 2, NULL);
691 CERROR("%s: empty flag for xattr set: rc = %d\n",
692 tgt_name(tsi->tsi_tgt), -EPROTO);
693 RETURN(err_serious(-EPROTO));
696 flag = le32_to_cpu(*(int *)tmp);
698 rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
700 tti->tti_u.update.tti_update_reply,
701 tti->tti_u.update.tti_update_reply_index);
705 static int out_obj_ref_add(const struct lu_env *env,
706 struct dt_object *dt_obj,
711 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
712 rc = dt_ref_add(env, dt_obj, th);
713 dt_write_unlock(env, dt_obj);
718 static int out_obj_ref_del(const struct lu_env *env,
719 struct dt_object *dt_obj,
724 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
725 rc = dt_ref_del(env, dt_obj, th);
726 dt_write_unlock(env, dt_obj);
731 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
734 struct dt_object *dt_obj = arg->object;
737 rc = out_obj_ref_add(env, dt_obj, th);
739 CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
740 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
742 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
746 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
749 return out_obj_ref_del(env, arg->object, th);
752 static int __out_tx_ref_add(const struct lu_env *env,
753 struct dt_object *dt_obj,
754 struct thandle_exec_args *ta,
755 struct update_reply *reply,
756 int index, char *file, int line)
760 LASSERT(ta->ta_handle != NULL);
761 ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
765 arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
768 lu_object_get(&dt_obj->do_lu);
769 arg->object = dt_obj;
776 * increase ref of the object
778 static int out_ref_add(struct tgt_session_info *tsi)
780 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
781 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
786 rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
787 tti->tti_u.update.tti_update_reply,
788 tti->tti_u.update.tti_update_reply_index);
792 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
795 struct dt_object *dt_obj = arg->object;
798 rc = out_obj_ref_del(env, dt_obj, th);
800 CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
801 dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
803 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
808 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
811 return out_obj_ref_add(env, arg->object, th);
814 static int __out_tx_ref_del(const struct lu_env *env,
815 struct dt_object *dt_obj,
816 struct thandle_exec_args *ta,
817 struct update_reply *reply,
818 int index, char *file, int line)
822 LASSERT(ta->ta_handle != NULL);
823 ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
827 arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
830 lu_object_get(&dt_obj->do_lu);
831 arg->object = dt_obj;
837 static int out_ref_del(struct tgt_session_info *tsi)
839 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
840 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
845 if (!lu_object_exists(&obj->do_lu))
848 rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
849 tti->tti_u.update.tti_update_reply,
850 tti->tti_u.update.tti_update_reply_index);
854 static int out_obj_index_insert(const struct lu_env *env,
855 struct dt_object *dt_obj,
856 const struct dt_rec *rec,
857 const struct dt_key *key,
862 CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
863 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
864 (char *)key, PFID((struct lu_fid *)rec));
866 if (dt_try_as_dir(env, dt_obj) == 0)
869 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
870 rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
871 dt_write_unlock(env, dt_obj);
876 static int out_obj_index_delete(const struct lu_env *env,
877 struct dt_object *dt_obj,
878 const struct dt_key *key,
883 CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
884 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
887 if (dt_try_as_dir(env, dt_obj) == 0)
890 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
891 rc = dt_delete(env, dt_obj, key, th, NULL);
892 dt_write_unlock(env, dt_obj);
897 static int out_tx_index_insert_exec(const struct lu_env *env,
898 struct thandle *th, struct tx_arg *arg)
900 struct dt_object *dt_obj = arg->object;
903 rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
904 arg->u.insert.key, th);
906 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
907 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
909 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
914 static int out_tx_index_insert_undo(const struct lu_env *env,
915 struct thandle *th, struct tx_arg *arg)
917 return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
920 static int __out_tx_index_insert(const struct lu_env *env,
921 struct dt_object *dt_obj,
922 char *name, struct lu_fid *fid,
923 struct thandle_exec_args *ta,
924 struct update_reply *reply,
925 int index, char *file, int line)
929 LASSERT(ta->ta_handle != NULL);
931 if (lu_object_exists(&dt_obj->do_lu)) {
932 if (dt_try_as_dir(env, dt_obj) == 0) {
933 ta->ta_err = -ENOTDIR;
936 ta->ta_err = dt_declare_insert(env, dt_obj,
937 (struct dt_rec *)fid,
938 (struct dt_key *)name,
945 arg = tx_add_exec(ta, out_tx_index_insert_exec,
946 out_tx_index_insert_undo, file,
949 lu_object_get(&dt_obj->do_lu);
950 arg->object = dt_obj;
953 arg->u.insert.rec = (struct dt_rec *)fid;
954 arg->u.insert.key = (struct dt_key *)name;
959 static int out_index_insert(struct tgt_session_info *tsi)
961 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
962 struct update *update = tti->tti_u.update.tti_update;
963 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
971 name = (char *)update_param_buf(update, 0, NULL);
973 CERROR("%s: empty name for index insert: rc = %d\n",
974 tgt_name(tsi->tsi_tgt), -EPROTO);
975 RETURN(err_serious(-EPROTO));
978 fid = (struct lu_fid *)update_param_buf(update, 1, &size);
979 if (fid == NULL || size != sizeof(*fid)) {
980 CERROR("%s: invalid fid: rc = %d\n",
981 tgt_name(tsi->tsi_tgt), -EPROTO);
982 RETURN(err_serious(-EPROTO));
985 fid_le_to_cpu(fid, fid);
986 if (!fid_is_sane(fid)) {
987 CERROR("%s: invalid FID "DFID": rc = %d\n",
988 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
989 RETURN(err_serious(-EPROTO));
992 rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
994 tti->tti_u.update.tti_update_reply,
995 tti->tti_u.update.tti_update_reply_index);
999 static int out_tx_index_delete_exec(const struct lu_env *env,
1005 rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1007 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1008 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1010 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1015 static int out_tx_index_delete_undo(const struct lu_env *env,
1019 CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1020 dt_obd_name(th->th_dev), -ENOTSUPP);
1024 static int __out_tx_index_delete(const struct lu_env *env,
1025 struct dt_object *dt_obj, char *name,
1026 struct thandle_exec_args *ta,
1027 struct update_reply *reply,
1028 int index, char *file, int line)
1032 if (dt_try_as_dir(env, dt_obj) == 0) {
1033 ta->ta_err = -ENOTDIR;
1037 LASSERT(ta->ta_handle != NULL);
1038 ta->ta_err = dt_declare_delete(env, dt_obj,
1039 (struct dt_key *)name,
1041 if (ta->ta_err != 0)
1044 arg = tx_add_exec(ta, out_tx_index_delete_exec,
1045 out_tx_index_delete_undo, file,
1048 lu_object_get(&dt_obj->do_lu);
1049 arg->object = dt_obj;
1052 arg->u.insert.key = (struct dt_key *)name;
1056 static int out_index_delete(struct tgt_session_info *tsi)
1058 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
1059 struct update *update = tti->tti_u.update.tti_update;
1060 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
1064 if (!lu_object_exists(&obj->do_lu))
1067 name = (char *)update_param_buf(update, 0, NULL);
1069 CERROR("%s: empty name for index delete: rc = %d\n",
1070 tgt_name(tsi->tsi_tgt), -EPROTO);
1071 RETURN(err_serious(-EPROTO));
1074 rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1075 tti->tti_u.update.tti_update_reply,
1076 tti->tti_u.update.tti_update_reply_index);
1080 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1083 struct dt_object *dt_obj = arg->object;
1086 rc = out_obj_destroy(env, dt_obj, th);
1088 CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1089 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1091 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1096 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1099 CERROR("%s: not support destroy undo yet!: rc = %d\n",
1100 dt_obd_name(th->th_dev), -ENOTSUPP);
1104 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1105 struct thandle_exec_args *ta,
1106 struct update_reply *reply,
1107 int index, char *file, int line)
1111 LASSERT(ta->ta_handle != NULL);
1112 ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1116 arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1119 lu_object_get(&dt_obj->do_lu);
1120 arg->object = dt_obj;
1126 static int out_destroy(struct tgt_session_info *tsi)
1128 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
1129 struct update *update = tti->tti_u.update.tti_update;
1130 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
1135 fid = &update->u_fid;
1136 fid_le_to_cpu(fid, fid);
1137 if (!fid_is_sane(fid)) {
1138 CERROR("%s: invalid FID "DFID": rc = %d\n",
1139 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1140 RETURN(err_serious(-EPROTO));
1143 if (!lu_object_exists(&obj->do_lu))
1146 rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1147 tti->tti_u.update.tti_update_reply,
1148 tti->tti_u.update.tti_update_reply_index);
1153 #define DEF_OUT_HNDL(opc, name, flags, fn) \
1154 [opc - OBJ_CREATE] = { \
1158 .th_flags = flags, \
1164 #define out_handler mdt_handler
1165 static struct tgt_handler out_update_ops[] = {
1166 DEF_OUT_HNDL(OBJ_CREATE, "obj_create", MUTABOR | HABEO_REFERO,
1168 DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", MUTABOR | HABEO_REFERO,
1170 DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", MUTABOR | HABEO_REFERO,
1172 DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", MUTABOR | HABEO_REFERO,
1174 DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", MUTABOR | HABEO_REFERO,
1176 DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", HABEO_REFERO,
1178 DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", MUTABOR | HABEO_REFERO,
1180 DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", HABEO_REFERO,
1182 DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", HABEO_REFERO,
1184 DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert",
1185 MUTABOR | HABEO_REFERO, out_index_insert),
1186 DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete",
1187 MUTABOR | HABEO_REFERO, out_index_delete),
1190 struct tgt_handler *out_handler_find(__u32 opc)
1192 struct tgt_handler *h;
1195 if (OBJ_CREATE <= opc && opc < OBJ_LAST) {
1196 h = &out_update_ops[opc - OBJ_CREATE];
1197 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1200 h = NULL; /* unsupported opc */
1206 * Object updates between Targets. Because all the updates has been
1207 * dis-assemblied into object updates at sender side, so OUT will
1208 * call OSD API directly to execute these updates.
1210 * In DNE phase I all of the updates in the request need to be executed
1211 * in one transaction, and the transaction has to be synchronously.
1213 * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1216 int out_handle(struct tgt_session_info *tsi)
1218 const struct lu_env *env = tsi->tsi_env;
1219 struct tgt_thread_info *tti = tgt_th_info(env);
1220 struct thandle_exec_args *ta = &tti->tti_tea;
1221 struct req_capsule *pill = tsi->tsi_pill;
1222 struct dt_device *dt = tsi->tsi_tgt->lut_bottom;
1223 struct update_buf *ubuf;
1224 struct update *update;
1225 struct update_reply *update_reply;
1228 int old_batchid = -1;
1236 req_capsule_set(pill, &RQF_UPDATE_OBJ);
1237 bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1238 if (bufsize != UPDATE_BUFFER_SIZE) {
1239 CERROR("%s: invalid bufsize %d: rc = %d\n",
1240 tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1241 RETURN(err_serious(-EPROTO));
1244 ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1246 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1248 RETURN(err_serious(-EPROTO));
1251 if (ubuf->ub_magic != UPDATE_BUFFER_MAGIC) {
1252 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1253 tgt_name(tsi->tsi_tgt), ubuf->ub_magic,
1254 UPDATE_BUFFER_MAGIC, -EPROTO);
1255 RETURN(err_serious(-EPROTO));
1258 count = ubuf->ub_count;
1260 CERROR("%s: No update!: rc = %d\n",
1261 tgt_name(tsi->tsi_tgt), -EPROTO);
1262 RETURN(err_serious(-EPROTO));
1265 req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1266 UPDATE_BUFFER_SIZE);
1267 rc = req_capsule_server_pack(pill);
1269 CERROR("%s: Can't pack response: rc = %d\n",
1270 tgt_name(tsi->tsi_tgt), rc);
1274 /* Prepare the update reply buffer */
1275 update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1276 if (update_reply == NULL)
1277 RETURN(err_serious(-EPROTO));
1278 update_init_reply_buf(update_reply, count);
1279 tti->tti_u.update.tti_update_reply = update_reply;
1281 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1285 tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1287 /* Walk through updates in the request to execute them synchronously */
1288 off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1289 for (i = 0; i < count; i++) {
1290 struct tgt_handler *h;
1291 struct dt_object *dt_obj;
1293 update = (struct update *)((char *)ubuf + off);
1294 if (old_batchid == -1) {
1295 old_batchid = update->u_batchid;
1296 } else if (old_batchid != update->u_batchid) {
1297 /* Stop the current update transaction,
1298 * create a new one */
1299 rc = out_tx_end(env, ta);
1303 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1306 old_batchid = update->u_batchid;
1309 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1310 if (!fid_is_sane(&update->u_fid)) {
1311 CERROR("%s: invalid FID "DFID": rc = %d\n",
1312 tgt_name(tsi->tsi_tgt), PFID(&update->u_fid),
1314 GOTO(out, rc = err_serious(-EPROTO));
1317 dt_obj = dt_locate(env, dt, &update->u_fid);
1319 GOTO(out, rc = PTR_ERR(dt_obj));
1321 tti->tti_u.update.tti_dt_object = dt_obj;
1322 tti->tti_u.update.tti_update = update;
1323 tti->tti_u.update.tti_update_reply_index = i;
1325 h = out_handler_find(update->u_type);
1326 if (likely(h != NULL)) {
1327 /* For real modification RPC, check if the update
1328 * has been executed */
1329 if (h->th_flags & MUTABOR) {
1330 struct ptlrpc_request *req = tgt_ses_req(tsi);
1332 if (out_check_resent(env, dt, dt_obj, req,
1338 rc = h->th_act(tsi);
1340 CERROR("%s: The unsupported opc: 0x%x\n",
1341 tgt_name(tsi->tsi_tgt), update->u_type);
1342 lu_object_put(env, &dt_obj->do_lu);
1343 GOTO(out, rc = -ENOTSUPP);
1346 lu_object_put(env, &dt_obj->do_lu);
1349 off += cfs_size_round(update_size(update));
1352 rc1 = out_tx_end(env, ta);
1358 struct tgt_handler tgt_out_handlers[] = {
1359 TGT_UPDATE_HDL(MUTABOR, UPDATE_OBJ, out_handle),
1361 EXPORT_SYMBOL(tgt_out_handlers);