4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
25 * lustre/mdt/out_handler.c
27 * Object update handler between targets.
29 * Author: di.wang <di.wang@intel.com>
32 #define DEBUG_SUBSYSTEM S_CLASS
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40 tx_exec_func_t undo, char *file, int line)
48 LASSERT(i < UPDATE_MAX_OPS);
52 ta->ta_args[i].exec_fn = func;
53 ta->ta_args[i].undo_fn = undo;
54 ta->ta_args[i].file = file;
55 ta->ta_args[i].line = line;
57 return &ta->ta_args[i];
60 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
61 struct thandle_exec_args *ta)
63 memset(ta, 0, sizeof(*ta));
64 ta->ta_handle = dt_trans_create(env, dt);
65 if (IS_ERR(ta->ta_handle)) {
66 CERROR("%s: start handle error: rc = %ld\n",
67 dt_obd_name(dt), PTR_ERR(ta->ta_handle));
68 return PTR_ERR(ta->ta_handle);
71 /*For phase I, sync for cross-ref operation*/
72 ta->ta_handle->th_sync = 1;
76 static int out_trans_start(const struct lu_env *env,
77 struct thandle_exec_args *ta)
79 /* Always do sync commit for Phase I */
80 LASSERT(ta->ta_handle->th_sync != 0);
81 return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
84 static int out_trans_stop(const struct lu_env *env,
85 struct thandle_exec_args *ta, int err)
90 ta->ta_handle->th_result = err;
91 LASSERT(ta->ta_handle->th_sync != 0);
92 rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
93 for (i = 0; i < ta->ta_argno; i++) {
94 if (ta->ta_args[i].object != NULL) {
95 lu_object_put(env, &ta->ta_args[i].object->do_lu);
96 ta->ta_args[i].object = NULL;
103 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
105 struct tgt_session_info *tsi = tgt_ses_info(env);
109 LASSERT(ta->ta_handle);
111 if (ta->ta_err != 0 || ta->ta_argno == 0)
112 GOTO(stop, rc = ta->ta_err);
114 rc = out_trans_start(env, ta);
118 for (i = 0; i < ta->ta_argno; i++) {
119 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
122 CDEBUG(D_INFO, "error during execution of #%u from"
123 " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
124 ta->ta_args[i].line, rc);
126 LASSERTF(ta->ta_args[i].undo_fn != NULL,
127 "can't undo changes, hope for failover!\n");
128 ta->ta_args[i].undo_fn(env, ta->ta_handle,
135 /* Only fail for real update */
136 tsi->tsi_reply_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
138 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
139 dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
140 out_trans_stop(env, ta, rc);
141 ta->ta_handle = NULL;
148 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
149 struct dt_object *obj, struct update_reply *reply,
152 CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
153 dt_obd_name(dt), reply, index, 0);
155 update_insert_reply(reply, NULL, 0, index, 0);
159 typedef void (*out_reconstruct_t)(const struct lu_env *env,
160 struct dt_device *dt,
161 struct dt_object *obj,
162 struct update_reply *reply,
165 static inline int out_check_resent(const struct lu_env *env,
166 struct dt_device *dt,
167 struct dt_object *obj,
168 struct ptlrpc_request *req,
169 out_reconstruct_t reconstruct,
170 struct update_reply *reply,
173 if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
176 if (req_xid_is_last(req)) {
177 reconstruct(env, dt, obj, reply, index);
180 DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
181 req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
185 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
190 CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
191 PFID(lu_object_fid(&dt_obj->do_lu)));
193 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
194 rc = dt_destroy(env, dt_obj, th);
195 dt_write_unlock(env, dt_obj);
201 * All of the xxx_undo will be used once execution failed,
202 * But because all of the required resource has been reserved in
203 * declare phase, i.e. if declare succeed, it should make sure
204 * the following executing phase succeed in anyway, so these undo
205 * should be useless for most of the time in Phase I
207 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
212 rc = out_obj_destroy(env, arg->object, th);
214 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
215 dt_obd_name(th->th_dev), rc);
219 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
222 struct dt_object *dt_obj = arg->object;
225 CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
226 dt_obd_name(th->th_dev),
227 PFID(lu_object_fid(&arg->object->do_lu)),
228 arg->u.create.dof.dof_type,
229 arg->u.create.attr.la_mode & S_IFMT);
231 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
232 rc = dt_create(env, dt_obj, &arg->u.create.attr,
233 &arg->u.create.hint, &arg->u.create.dof, th);
235 dt_write_unlock(env, dt_obj);
237 CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
238 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
240 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
245 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
246 struct lu_attr *attr, struct lu_fid *parent_fid,
247 struct dt_object_format *dof,
248 struct thandle_exec_args *ta,
249 struct update_reply *reply,
250 int index, char *file, int line)
254 LASSERT(ta->ta_handle != NULL);
255 ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
260 arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
264 /* release the object in out_trans_stop */
265 lu_object_get(&obj->do_lu);
267 arg->u.create.attr = *attr;
269 arg->u.create.fid = *parent_fid;
270 memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
271 arg->u.create.dof = *dof;
278 static int out_create(struct tgt_session_info *tsi)
280 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
281 struct update *update = tti->tti_u.update.tti_update;
282 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
283 struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
284 struct obdo *lobdo = &tti->tti_u.update.tti_obdo;
285 struct lu_attr *attr = &tti->tti_attr;
286 struct lu_fid *fid = NULL;
293 wobdo = update_param_buf(update, 0, &size);
294 if (wobdo == NULL || size != sizeof(*wobdo)) {
295 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
296 tgt_name(tsi->tsi_tgt), -EPROTO);
297 RETURN(err_serious(-EPROTO));
300 obdo_le_to_cpu(wobdo, wobdo);
301 lustre_get_wire_obdo(NULL, lobdo, wobdo);
302 la_from_obdo(attr, lobdo, lobdo->o_valid);
304 dof->dof_type = dt_mode_to_dft(attr->la_mode);
305 if (S_ISDIR(attr->la_mode)) {
308 fid = update_param_buf(update, 1, &size);
309 if (fid == NULL || size != sizeof(*fid)) {
310 CERROR("%s: invalid fid: rc = %d\n",
311 tgt_name(tsi->tsi_tgt), -EPROTO);
312 RETURN(err_serious(-EPROTO));
314 fid_le_to_cpu(fid, fid);
315 if (!fid_is_sane(fid)) {
316 CERROR("%s: invalid fid "DFID": rc = %d\n",
317 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
318 RETURN(err_serious(-EPROTO));
322 if (lu_object_exists(&obj->do_lu))
325 rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
327 tti->tti_u.update.tti_update_reply,
328 tti->tti_u.update.tti_update_reply_index);
333 static int out_tx_attr_set_undo(const struct lu_env *env,
334 struct thandle *th, struct tx_arg *arg)
336 CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
337 dt_obd_name(th->th_dev),
338 PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
343 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
346 struct dt_object *dt_obj = arg->object;
349 CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
350 PFID(lu_object_fid(&dt_obj->do_lu)));
352 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
353 rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
354 dt_write_unlock(env, dt_obj);
356 CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
357 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
359 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
364 static int __out_tx_attr_set(const struct lu_env *env,
365 struct dt_object *dt_obj,
366 const struct lu_attr *attr,
367 struct thandle_exec_args *th,
368 struct update_reply *reply, int index,
369 char *file, int line)
373 LASSERT(th->ta_handle != NULL);
374 th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
378 arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
381 lu_object_get(&dt_obj->do_lu);
382 arg->object = dt_obj;
383 arg->u.attr_set.attr = *attr;
389 static int out_attr_set(struct tgt_session_info *tsi)
391 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
392 struct update *update = tti->tti_u.update.tti_update;
393 struct lu_attr *attr = &tti->tti_attr;
394 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
395 struct obdo *lobdo = &tti->tti_u.update.tti_obdo;
402 wobdo = update_param_buf(update, 0, &size);
403 if (wobdo == NULL || size != sizeof(*wobdo)) {
404 CERROR("%s: empty obdo in the update: rc = %d\n",
405 tgt_name(tsi->tsi_tgt), -EPROTO);
406 RETURN(err_serious(-EPROTO));
411 obdo_le_to_cpu(wobdo, wobdo);
412 lustre_get_wire_obdo(NULL, lobdo, wobdo);
413 la_from_obdo(attr, lobdo, lobdo->o_valid);
415 rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
416 tti->tti_u.update.tti_update_reply,
417 tti->tti_u.update.tti_update_reply_index);
422 static int out_attr_get(struct tgt_session_info *tsi)
424 const struct lu_env *env = tsi->tsi_env;
425 struct tgt_thread_info *tti = tgt_th_info(env);
426 struct obdo *obdo = &tti->tti_u.update.tti_obdo;
427 struct lu_attr *la = &tti->tti_attr;
428 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
433 if (!lu_object_exists(&obj->do_lu))
436 dt_read_lock(env, obj, MOR_TGT_CHILD);
437 rc = dt_attr_get(env, obj, la, NULL);
439 GOTO(out_unlock, rc);
441 * If it is a directory, we will also check whether the
442 * directory is empty.
443 * la_flags = 0 : Empty.
447 if (S_ISDIR(la->la_mode)) {
449 const struct dt_it_ops *iops;
451 if (!dt_try_as_dir(env, obj))
452 GOTO(out_unlock, rc = -ENOTDIR);
454 iops = &obj->do_index_ops->dio_it;
455 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
458 result = iops->get(env, it, (const void *)"");
461 for (result = 0, i = 0; result == 0 && i < 3;
463 result = iops->next(env, it);
466 } else if (result == 0)
468 * Huh? Index contains no zero key?
478 obdo_from_la(obdo, la, la->la_valid);
479 obdo_cpu_to_le(obdo, obdo);
480 lustre_set_wire_obdo(NULL, obdo, obdo);
483 dt_read_unlock(env, obj);
485 CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
486 tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
489 update_insert_reply(tti->tti_u.update.tti_update_reply, obdo,
490 sizeof(*obdo), 0, rc);
494 static int out_xattr_get(struct tgt_session_info *tsi)
496 const struct lu_env *env = tsi->tsi_env;
497 struct tgt_thread_info *tti = tgt_th_info(env);
498 struct update *update = tti->tti_u.update.tti_update;
499 struct lu_buf *lbuf = &tti->tti_buf;
500 struct update_reply *reply = tti->tti_u.update.tti_update_reply;
501 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
508 name = (char *)update_param_buf(update, 0, NULL);
510 CERROR("%s: empty name for xattr get: rc = %d\n",
511 tgt_name(tsi->tsi_tgt), -EPROTO);
512 RETURN(err_serious(-EPROTO));
515 ptr = update_get_buf_internal(reply, 0, NULL);
516 LASSERT(ptr != NULL);
518 /* The first 4 bytes(int) are used to store the result */
519 lbuf->lb_buf = (char *)ptr + sizeof(int);
520 lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
521 dt_read_lock(env, obj, MOR_TGT_CHILD);
522 rc = dt_xattr_get(env, obj, lbuf, name, NULL);
523 dt_read_unlock(env, obj);
530 GOTO(out, rc = -ENOENT);
534 CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
535 tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
536 name, (int)lbuf->lb_len);
539 reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
543 static int out_index_lookup(struct tgt_session_info *tsi)
545 const struct lu_env *env = tsi->tsi_env;
546 struct tgt_thread_info *tti = tgt_th_info(env);
547 struct update *update = tti->tti_u.update.tti_update;
548 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
554 if (!lu_object_exists(&obj->do_lu))
557 name = (char *)update_param_buf(update, 0, NULL);
559 CERROR("%s: empty name for lookup: rc = %d\n",
560 tgt_name(tsi->tsi_tgt), -EPROTO);
561 RETURN(err_serious(-EPROTO));
564 dt_read_lock(env, obj, MOR_TGT_CHILD);
565 if (!dt_try_as_dir(env, obj))
566 GOTO(out_unlock, rc = -ENOTDIR);
568 rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
569 (struct dt_key *)name, NULL);
572 GOTO(out_unlock, rc);
577 CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
578 PFID(lu_object_fid(&obj->do_lu)), name,
579 PFID(&tti->tti_fid1), rc);
580 fid_cpu_to_le(&tti->tti_fid1, &tti->tti_fid1);
583 dt_read_unlock(env, obj);
585 CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
586 tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
589 update_insert_reply(tti->tti_u.update.tti_update_reply,
590 &tti->tti_fid1, sizeof(tti->tti_fid1), 0, rc);
594 static int out_tx_xattr_set_exec(const struct lu_env *env,
598 struct dt_object *dt_obj = arg->object;
601 CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
602 dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
603 arg->u.xattr_set.name, arg->u.xattr_set.flags);
605 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
606 rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
607 arg->u.xattr_set.name, arg->u.xattr_set.flags,
609 dt_write_unlock(env, dt_obj);
611 * Ignore errors if this is LINK EA
613 if (unlikely(rc && !strncmp(arg->u.xattr_set.name, XATTR_NAME_LINK,
614 strlen(XATTR_NAME_LINK))))
617 CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
618 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
620 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
625 static int __out_tx_xattr_set(const struct lu_env *env,
626 struct dt_object *dt_obj,
627 const struct lu_buf *buf,
628 const char *name, int flags,
629 struct thandle_exec_args *ta,
630 struct update_reply *reply, int index,
631 char *file, int line)
635 LASSERT(ta->ta_handle != NULL);
636 ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
637 flags, ta->ta_handle);
641 arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
643 lu_object_get(&dt_obj->do_lu);
644 arg->object = dt_obj;
645 arg->u.xattr_set.name = name;
646 arg->u.xattr_set.flags = flags;
647 arg->u.xattr_set.buf = *buf;
650 arg->u.xattr_set.csum = 0;
654 static int out_xattr_set(struct tgt_session_info *tsi)
656 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
657 struct update *update = tti->tti_u.update.tti_update;
658 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
659 struct lu_buf *lbuf = &tti->tti_buf;
668 name = update_param_buf(update, 0, NULL);
670 CERROR("%s: empty name for xattr set: rc = %d\n",
671 tgt_name(tsi->tsi_tgt), -EPROTO);
672 RETURN(err_serious(-EPROTO));
675 buf = (char *)update_param_buf(update, 1, &buf_len);
676 if (buf == NULL || buf_len == 0) {
677 CERROR("%s: empty buf for xattr set: rc = %d\n",
678 tgt_name(tsi->tsi_tgt), -EPROTO);
679 RETURN(err_serious(-EPROTO));
683 lbuf->lb_len = buf_len;
685 tmp = (char *)update_param_buf(update, 2, NULL);
687 CERROR("%s: empty flag for xattr set: rc = %d\n",
688 tgt_name(tsi->tsi_tgt), -EPROTO);
689 RETURN(err_serious(-EPROTO));
692 flag = le32_to_cpu(*(int *)tmp);
694 rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
696 tti->tti_u.update.tti_update_reply,
697 tti->tti_u.update.tti_update_reply_index);
701 static int out_obj_ref_add(const struct lu_env *env,
702 struct dt_object *dt_obj,
707 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
708 rc = dt_ref_add(env, dt_obj, th);
709 dt_write_unlock(env, dt_obj);
714 static int out_obj_ref_del(const struct lu_env *env,
715 struct dt_object *dt_obj,
720 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
721 rc = dt_ref_del(env, dt_obj, th);
722 dt_write_unlock(env, dt_obj);
727 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
730 struct dt_object *dt_obj = arg->object;
733 rc = out_obj_ref_add(env, dt_obj, th);
735 CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
736 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
738 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
742 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
745 return out_obj_ref_del(env, arg->object, th);
748 static int __out_tx_ref_add(const struct lu_env *env,
749 struct dt_object *dt_obj,
750 struct thandle_exec_args *ta,
751 struct update_reply *reply,
752 int index, char *file, int line)
756 LASSERT(ta->ta_handle != NULL);
757 ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
761 arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
764 lu_object_get(&dt_obj->do_lu);
765 arg->object = dt_obj;
772 * increase ref of the object
774 static int out_ref_add(struct tgt_session_info *tsi)
776 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
777 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
782 rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
783 tti->tti_u.update.tti_update_reply,
784 tti->tti_u.update.tti_update_reply_index);
788 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
791 struct dt_object *dt_obj = arg->object;
794 rc = out_obj_ref_del(env, dt_obj, th);
796 CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
797 dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
799 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
804 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
807 return out_obj_ref_add(env, arg->object, th);
810 static int __out_tx_ref_del(const struct lu_env *env,
811 struct dt_object *dt_obj,
812 struct thandle_exec_args *ta,
813 struct update_reply *reply,
814 int index, char *file, int line)
818 LASSERT(ta->ta_handle != NULL);
819 ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
823 arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
826 lu_object_get(&dt_obj->do_lu);
827 arg->object = dt_obj;
833 static int out_ref_del(struct tgt_session_info *tsi)
835 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
836 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
841 if (!lu_object_exists(&obj->do_lu))
844 rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
845 tti->tti_u.update.tti_update_reply,
846 tti->tti_u.update.tti_update_reply_index);
850 static int out_obj_index_insert(const struct lu_env *env,
851 struct dt_object *dt_obj,
852 const struct dt_rec *rec,
853 const struct dt_key *key,
858 CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
859 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
860 (char *)key, PFID((struct lu_fid *)rec));
862 if (dt_try_as_dir(env, dt_obj) == 0)
865 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
866 rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
867 dt_write_unlock(env, dt_obj);
872 static int out_obj_index_delete(const struct lu_env *env,
873 struct dt_object *dt_obj,
874 const struct dt_key *key,
879 CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
880 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
883 if (dt_try_as_dir(env, dt_obj) == 0)
886 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
887 rc = dt_delete(env, dt_obj, key, th, NULL);
888 dt_write_unlock(env, dt_obj);
893 static int out_tx_index_insert_exec(const struct lu_env *env,
894 struct thandle *th, struct tx_arg *arg)
896 struct dt_object *dt_obj = arg->object;
899 rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
900 arg->u.insert.key, th);
902 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
903 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
905 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
910 static int out_tx_index_insert_undo(const struct lu_env *env,
911 struct thandle *th, struct tx_arg *arg)
913 return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
916 static int __out_tx_index_insert(const struct lu_env *env,
917 struct dt_object *dt_obj,
918 char *name, struct lu_fid *fid,
919 struct thandle_exec_args *ta,
920 struct update_reply *reply,
921 int index, char *file, int line)
925 LASSERT(ta->ta_handle != NULL);
927 if (lu_object_exists(&dt_obj->do_lu)) {
928 if (dt_try_as_dir(env, dt_obj) == 0) {
929 ta->ta_err = -ENOTDIR;
932 ta->ta_err = dt_declare_insert(env, dt_obj,
933 (struct dt_rec *)fid,
934 (struct dt_key *)name,
941 arg = tx_add_exec(ta, out_tx_index_insert_exec,
942 out_tx_index_insert_undo, file,
945 lu_object_get(&dt_obj->do_lu);
946 arg->object = dt_obj;
949 arg->u.insert.rec = (struct dt_rec *)fid;
950 arg->u.insert.key = (struct dt_key *)name;
955 static int out_index_insert(struct tgt_session_info *tsi)
957 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
958 struct update *update = tti->tti_u.update.tti_update;
959 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
967 name = (char *)update_param_buf(update, 0, NULL);
969 CERROR("%s: empty name for index insert: rc = %d\n",
970 tgt_name(tsi->tsi_tgt), -EPROTO);
971 RETURN(err_serious(-EPROTO));
974 fid = (struct lu_fid *)update_param_buf(update, 1, &size);
975 if (fid == NULL || size != sizeof(*fid)) {
976 CERROR("%s: invalid fid: rc = %d\n",
977 tgt_name(tsi->tsi_tgt), -EPROTO);
978 RETURN(err_serious(-EPROTO));
981 fid_le_to_cpu(fid, fid);
982 if (!fid_is_sane(fid)) {
983 CERROR("%s: invalid FID "DFID": rc = %d\n",
984 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
985 RETURN(err_serious(-EPROTO));
988 rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
990 tti->tti_u.update.tti_update_reply,
991 tti->tti_u.update.tti_update_reply_index);
995 static int out_tx_index_delete_exec(const struct lu_env *env,
1001 rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1003 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1004 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1006 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1011 static int out_tx_index_delete_undo(const struct lu_env *env,
1015 CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1016 dt_obd_name(th->th_dev), -ENOTSUPP);
1020 static int __out_tx_index_delete(const struct lu_env *env,
1021 struct dt_object *dt_obj, char *name,
1022 struct thandle_exec_args *ta,
1023 struct update_reply *reply,
1024 int index, char *file, int line)
1028 if (dt_try_as_dir(env, dt_obj) == 0) {
1029 ta->ta_err = -ENOTDIR;
1033 LASSERT(ta->ta_handle != NULL);
1034 ta->ta_err = dt_declare_delete(env, dt_obj,
1035 (struct dt_key *)name,
1037 if (ta->ta_err != 0)
1040 arg = tx_add_exec(ta, out_tx_index_delete_exec,
1041 out_tx_index_delete_undo, file,
1044 lu_object_get(&dt_obj->do_lu);
1045 arg->object = dt_obj;
1048 arg->u.insert.key = (struct dt_key *)name;
1052 static int out_index_delete(struct tgt_session_info *tsi)
1054 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
1055 struct update *update = tti->tti_u.update.tti_update;
1056 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
1060 if (!lu_object_exists(&obj->do_lu))
1063 name = (char *)update_param_buf(update, 0, NULL);
1065 CERROR("%s: empty name for index delete: rc = %d\n",
1066 tgt_name(tsi->tsi_tgt), -EPROTO);
1067 RETURN(err_serious(-EPROTO));
1070 rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1071 tti->tti_u.update.tti_update_reply,
1072 tti->tti_u.update.tti_update_reply_index);
1076 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1079 struct dt_object *dt_obj = arg->object;
1082 rc = out_obj_destroy(env, dt_obj, th);
1084 CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1085 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1087 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1092 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1095 CERROR("%s: not support destroy undo yet!: rc = %d\n",
1096 dt_obd_name(th->th_dev), -ENOTSUPP);
1100 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1101 struct thandle_exec_args *ta,
1102 struct update_reply *reply,
1103 int index, char *file, int line)
1107 LASSERT(ta->ta_handle != NULL);
1108 ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1112 arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1115 lu_object_get(&dt_obj->do_lu);
1116 arg->object = dt_obj;
1122 static int out_destroy(struct tgt_session_info *tsi)
1124 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
1125 struct update *update = tti->tti_u.update.tti_update;
1126 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
1131 fid = &update->u_fid;
1132 fid_le_to_cpu(fid, fid);
1133 if (!fid_is_sane(fid)) {
1134 CERROR("%s: invalid FID "DFID": rc = %d\n",
1135 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1136 RETURN(err_serious(-EPROTO));
1139 if (!lu_object_exists(&obj->do_lu))
1142 rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1143 tti->tti_u.update.tti_update_reply,
1144 tti->tti_u.update.tti_update_reply_index);
1149 #define DEF_OUT_HNDL(opc, name, flags, fn) \
1150 [opc - OBJ_CREATE] = { \
1154 .th_flags = flags, \
1160 #define out_handler mdt_handler
1161 static struct tgt_handler out_update_ops[] = {
1162 DEF_OUT_HNDL(OBJ_CREATE, "obj_create", MUTABOR | HABEO_REFERO,
1164 DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", MUTABOR | HABEO_REFERO,
1166 DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", MUTABOR | HABEO_REFERO,
1168 DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", MUTABOR | HABEO_REFERO,
1170 DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", MUTABOR | HABEO_REFERO,
1172 DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", HABEO_REFERO,
1174 DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", MUTABOR | HABEO_REFERO,
1176 DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", HABEO_REFERO,
1178 DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", HABEO_REFERO,
1180 DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert",
1181 MUTABOR | HABEO_REFERO, out_index_insert),
1182 DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete",
1183 MUTABOR | HABEO_REFERO, out_index_delete),
1186 struct tgt_handler *out_handler_find(__u32 opc)
1188 struct tgt_handler *h;
1191 if (OBJ_CREATE <= opc && opc < OBJ_LAST) {
1192 h = &out_update_ops[opc - OBJ_CREATE];
1193 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1196 h = NULL; /* unsupported opc */
1202 * Object updates between Targets. Because all the updates has been
1203 * dis-assemblied into object updates at sender side, so OUT will
1204 * call OSD API directly to execute these updates.
1206 * In DNE phase I all of the updates in the request need to be executed
1207 * in one transaction, and the transaction has to be synchronously.
1209 * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1212 int out_handle(struct tgt_session_info *tsi)
1214 const struct lu_env *env = tsi->tsi_env;
1215 struct tgt_thread_info *tti = tgt_th_info(env);
1216 struct thandle_exec_args *ta = &tti->tti_tea;
1217 struct req_capsule *pill = tsi->tsi_pill;
1218 struct dt_device *dt = tsi->tsi_tgt->lut_bottom;
1219 struct update_buf *ubuf;
1220 struct update *update;
1221 struct update_reply *update_reply;
1224 int old_batchid = -1;
1232 req_capsule_set(pill, &RQF_UPDATE_OBJ);
1233 bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1234 if (bufsize != UPDATE_BUFFER_SIZE) {
1235 CERROR("%s: invalid bufsize %d: rc = %d\n",
1236 tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1237 RETURN(err_serious(-EPROTO));
1240 ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1242 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1244 RETURN(err_serious(-EPROTO));
1247 if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
1248 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1249 tgt_name(tsi->tsi_tgt), le32_to_cpu(ubuf->ub_magic),
1250 UPDATE_BUFFER_MAGIC, -EPROTO);
1251 RETURN(err_serious(-EPROTO));
1254 count = le32_to_cpu(ubuf->ub_count);
1256 CERROR("%s: No update!: rc = %d\n",
1257 tgt_name(tsi->tsi_tgt), -EPROTO);
1258 RETURN(err_serious(-EPROTO));
1261 req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1262 UPDATE_BUFFER_SIZE);
1263 rc = req_capsule_server_pack(pill);
1265 CERROR("%s: Can't pack response: rc = %d\n",
1266 tgt_name(tsi->tsi_tgt), rc);
1270 /* Prepare the update reply buffer */
1271 update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1272 if (update_reply == NULL)
1273 RETURN(err_serious(-EPROTO));
1274 update_init_reply_buf(update_reply, count);
1275 tti->tti_u.update.tti_update_reply = update_reply;
1277 rc = out_tx_start(env, dt, ta);
1281 tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1283 /* Walk through updates in the request to execute them synchronously */
1284 off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1285 for (i = 0; i < count; i++) {
1286 struct tgt_handler *h;
1287 struct dt_object *dt_obj;
1289 update = (struct update *)((char *)ubuf + off);
1290 if (old_batchid == -1) {
1291 old_batchid = update->u_batchid;
1292 } else if (old_batchid != update->u_batchid) {
1293 /* Stop the current update transaction,
1294 * create a new one */
1295 rc = out_tx_end(env, ta);
1299 rc = out_tx_start(env, dt, ta);
1302 old_batchid = update->u_batchid;
1305 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1306 if (!fid_is_sane(&update->u_fid)) {
1307 CERROR("%s: invalid FID "DFID": rc = %d\n",
1308 tgt_name(tsi->tsi_tgt), PFID(&update->u_fid),
1310 GOTO(out, rc = err_serious(-EPROTO));
1313 dt_obj = dt_locate(env, dt, &update->u_fid);
1315 GOTO(out, rc = PTR_ERR(dt_obj));
1317 tti->tti_u.update.tti_dt_object = dt_obj;
1318 tti->tti_u.update.tti_update = update;
1319 tti->tti_u.update.tti_update_reply_index = i;
1321 h = out_handler_find(update->u_type);
1322 if (likely(h != NULL)) {
1323 /* For real modification RPC, check if the update
1324 * has been executed */
1325 if (h->th_flags & MUTABOR) {
1326 struct ptlrpc_request *req = tgt_ses_req(tsi);
1328 if (out_check_resent(env, dt, dt_obj, req,
1334 rc = h->th_act(tsi);
1336 CERROR("%s: The unsupported opc: 0x%x\n",
1337 tgt_name(tsi->tsi_tgt), update->u_type);
1338 lu_object_put(env, &dt_obj->do_lu);
1339 GOTO(out, rc = -ENOTSUPP);
1342 lu_object_put(env, &dt_obj->do_lu);
1345 off += cfs_size_round(update_size(update));
1348 rc1 = out_tx_end(env, ta);
1354 struct tgt_handler tgt_out_handlers[] = {
1355 TGT_UPDATE_HDL(MUTABOR, UPDATE_OBJ, out_handle),
1357 EXPORT_SYMBOL(tgt_out_handlers);