4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
25 * lustre/mdt/out_handler.c
27 * Object update handler between targets.
29 * Author: di.wang <di.wang@intel.com>
32 #define DEBUG_SUBSYSTEM S_CLASS
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40 tx_exec_func_t undo, char *file, int line)
48 LASSERT(i < UPDATE_MAX_OPS);
52 ta->ta_args[i].exec_fn = func;
53 ta->ta_args[i].undo_fn = undo;
54 ta->ta_args[i].file = file;
55 ta->ta_args[i].line = line;
57 return &ta->ta_args[i];
60 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
61 struct thandle_exec_args *ta)
63 memset(ta, 0, sizeof(*ta));
64 ta->ta_handle = dt_trans_create(env, dt);
65 if (IS_ERR(ta->ta_handle)) {
66 CERROR("%s: start handle error: rc = %ld\n",
67 dt_obd_name(dt), PTR_ERR(ta->ta_handle));
68 return PTR_ERR(ta->ta_handle);
71 /*For phase I, sync for cross-ref operation*/
72 ta->ta_handle->th_sync = 1;
76 static int out_trans_start(const struct lu_env *env,
77 struct thandle_exec_args *ta)
79 /* Always do sync commit for Phase I */
80 LASSERT(ta->ta_handle->th_sync != 0);
81 return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
84 static int out_trans_stop(const struct lu_env *env,
85 struct thandle_exec_args *ta, int err)
90 ta->ta_handle->th_result = err;
91 LASSERT(ta->ta_handle->th_sync != 0);
92 rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
93 for (i = 0; i < ta->ta_argno; i++) {
94 if (ta->ta_args[i].object != NULL) {
95 lu_object_put(env, &ta->ta_args[i].object->do_lu);
96 ta->ta_args[i].object = NULL;
103 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
105 struct tgt_session_info *tsi = tgt_ses_info(env);
109 LASSERT(ta->ta_handle);
111 if (ta->ta_err != 0 || ta->ta_argno == 0)
112 GOTO(stop, rc = ta->ta_err);
114 rc = out_trans_start(env, ta);
118 for (i = 0; i < ta->ta_argno; i++) {
119 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
122 CDEBUG(D_INFO, "error during execution of #%u from"
123 " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
124 ta->ta_args[i].line, rc);
126 LASSERTF(ta->ta_args[i].undo_fn != NULL,
127 "can't undo changes, hope for failover!\n");
128 ta->ta_args[i].undo_fn(env, ta->ta_handle,
135 /* Only fail for real update */
136 tsi->tsi_reply_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
138 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
139 dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
140 out_trans_stop(env, ta, rc);
141 ta->ta_handle = NULL;
148 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
149 struct dt_object *obj, struct update_reply *reply,
152 CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
153 dt_obd_name(dt), reply, index, 0);
155 update_insert_reply(reply, NULL, 0, index, 0);
159 typedef void (*out_reconstruct_t)(const struct lu_env *env,
160 struct dt_device *dt,
161 struct dt_object *obj,
162 struct update_reply *reply,
165 static inline int out_check_resent(const struct lu_env *env,
166 struct dt_device *dt,
167 struct dt_object *obj,
168 struct ptlrpc_request *req,
169 out_reconstruct_t reconstruct,
170 struct update_reply *reply,
173 if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
176 if (req_xid_is_last(req)) {
177 reconstruct(env, dt, obj, reply, index);
180 DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
181 req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
185 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
190 CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
191 PFID(lu_object_fid(&dt_obj->do_lu)));
193 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
194 rc = dt_destroy(env, dt_obj, th);
195 dt_write_unlock(env, dt_obj);
201 * All of the xxx_undo will be used once execution failed,
202 * But because all of the required resource has been reserved in
203 * declare phase, i.e. if declare succeed, it should make sure
204 * the following executing phase succeed in anyway, so these undo
205 * should be useless for most of the time in Phase I
207 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
212 rc = out_obj_destroy(env, arg->object, th);
214 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
215 dt_obd_name(th->th_dev), rc);
219 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
222 struct dt_object *dt_obj = arg->object;
225 CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
226 dt_obd_name(th->th_dev),
227 PFID(lu_object_fid(&arg->object->do_lu)),
228 arg->u.create.dof.dof_type,
229 arg->u.create.attr.la_mode & S_IFMT);
231 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
232 rc = dt_create(env, dt_obj, &arg->u.create.attr,
233 &arg->u.create.hint, &arg->u.create.dof, th);
235 dt_write_unlock(env, dt_obj);
237 CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
238 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
240 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
245 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
246 struct lu_attr *attr, struct lu_fid *parent_fid,
247 struct dt_object_format *dof,
248 struct thandle_exec_args *ta,
249 struct update_reply *reply,
250 int index, char *file, int line)
254 LASSERT(ta->ta_handle != NULL);
255 ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
260 arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
264 /* release the object in out_trans_stop */
265 lu_object_get(&obj->do_lu);
267 arg->u.create.attr = *attr;
269 arg->u.create.fid = *parent_fid;
270 memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
271 arg->u.create.dof = *dof;
278 static int out_create(struct tgt_session_info *tsi)
280 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
281 struct update *update = tti->tti_u.update.tti_update;
282 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
283 struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
284 struct obdo *lobdo = &tti->tti_u.update.tti_obdo;
285 struct lu_attr *attr = &tti->tti_attr;
286 struct lu_fid *fid = NULL;
293 wobdo = update_param_buf(update, 0, &size);
294 if (wobdo == NULL || size != sizeof(*wobdo)) {
295 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
296 tgt_name(tsi->tsi_tgt), -EPROTO);
297 RETURN(err_serious(-EPROTO));
300 obdo_le_to_cpu(wobdo, wobdo);
301 lustre_get_wire_obdo(NULL, lobdo, wobdo);
302 la_from_obdo(attr, lobdo, lobdo->o_valid);
304 dof->dof_type = dt_mode_to_dft(attr->la_mode);
305 if (S_ISDIR(attr->la_mode)) {
308 fid = update_param_buf(update, 1, &size);
309 if (fid == NULL || size != sizeof(*fid)) {
310 CERROR("%s: invalid fid: rc = %d\n",
311 tgt_name(tsi->tsi_tgt), -EPROTO);
312 RETURN(err_serious(-EPROTO));
314 fid_le_to_cpu(fid, fid);
315 if (!fid_is_sane(fid)) {
316 CERROR("%s: invalid fid "DFID": rc = %d\n",
317 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
318 RETURN(err_serious(-EPROTO));
322 if (lu_object_exists(&obj->do_lu))
325 rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
327 tti->tti_u.update.tti_update_reply,
328 tti->tti_u.update.tti_update_reply_index);
333 static int out_tx_attr_set_undo(const struct lu_env *env,
334 struct thandle *th, struct tx_arg *arg)
336 CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
337 dt_obd_name(th->th_dev),
338 PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
343 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
346 struct dt_object *dt_obj = arg->object;
349 CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
350 PFID(lu_object_fid(&dt_obj->do_lu)));
352 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
353 rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
354 dt_write_unlock(env, dt_obj);
356 CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
357 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
359 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
364 static int __out_tx_attr_set(const struct lu_env *env,
365 struct dt_object *dt_obj,
366 const struct lu_attr *attr,
367 struct thandle_exec_args *th,
368 struct update_reply *reply, int index,
369 char *file, int line)
373 LASSERT(th->ta_handle != NULL);
374 th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
378 arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
381 lu_object_get(&dt_obj->do_lu);
382 arg->object = dt_obj;
383 arg->u.attr_set.attr = *attr;
389 static int out_attr_set(struct tgt_session_info *tsi)
391 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
392 struct update *update = tti->tti_u.update.tti_update;
393 struct lu_attr *attr = &tti->tti_attr;
394 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
395 struct obdo *lobdo = &tti->tti_u.update.tti_obdo;
402 wobdo = update_param_buf(update, 0, &size);
403 if (wobdo == NULL || size != sizeof(*wobdo)) {
404 CERROR("%s: empty obdo in the update: rc = %d\n",
405 tgt_name(tsi->tsi_tgt), -EPROTO);
406 RETURN(err_serious(-EPROTO));
411 obdo_le_to_cpu(wobdo, wobdo);
412 lustre_get_wire_obdo(NULL, lobdo, wobdo);
413 la_from_obdo(attr, lobdo, lobdo->o_valid);
415 rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
416 tti->tti_u.update.tti_update_reply,
417 tti->tti_u.update.tti_update_reply_index);
422 static int out_attr_get(struct tgt_session_info *tsi)
424 const struct lu_env *env = tsi->tsi_env;
425 struct tgt_thread_info *tti = tgt_th_info(env);
426 struct obdo *obdo = &tti->tti_u.update.tti_obdo;
427 struct lu_attr *la = &tti->tti_attr;
428 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
433 if (!lu_object_exists(&obj->do_lu))
436 dt_read_lock(env, obj, MOR_TGT_CHILD);
437 rc = dt_attr_get(env, obj, la, NULL);
439 GOTO(out_unlock, rc);
441 * If it is a directory, we will also check whether the
442 * directory is empty.
443 * la_flags = 0 : Empty.
447 if (S_ISDIR(la->la_mode)) {
449 const struct dt_it_ops *iops;
451 if (!dt_try_as_dir(env, obj))
452 GOTO(out_unlock, rc = -ENOTDIR);
454 iops = &obj->do_index_ops->dio_it;
455 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
458 result = iops->get(env, it, (const void *)"");
461 for (result = 0, i = 0; result == 0 && i < 3;
463 result = iops->next(env, it);
466 } else if (result == 0)
468 * Huh? Index contains no zero key?
478 obdo_from_la(obdo, la, la->la_valid);
479 obdo_cpu_to_le(obdo, obdo);
480 lustre_set_wire_obdo(NULL, obdo, obdo);
483 dt_read_unlock(env, obj);
485 CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
486 tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
489 update_insert_reply(tti->tti_u.update.tti_update_reply, obdo,
490 sizeof(*obdo), 0, rc);
494 static int out_xattr_get(struct tgt_session_info *tsi)
496 const struct lu_env *env = tsi->tsi_env;
497 struct tgt_thread_info *tti = tgt_th_info(env);
498 struct update *update = tti->tti_u.update.tti_update;
499 struct lu_buf *lbuf = &tti->tti_buf;
500 struct update_reply *reply = tti->tti_u.update.tti_update_reply;
501 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
508 name = (char *)update_param_buf(update, 0, NULL);
510 CERROR("%s: empty name for xattr get: rc = %d\n",
511 tgt_name(tsi->tsi_tgt), -EPROTO);
512 RETURN(err_serious(-EPROTO));
515 ptr = update_get_buf_internal(reply, 0, NULL);
516 LASSERT(ptr != NULL);
518 /* The first 4 bytes(int) are used to store the result */
519 lbuf->lb_buf = (char *)ptr + sizeof(int);
520 lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
521 dt_read_lock(env, obj, MOR_TGT_CHILD);
522 rc = dt_xattr_get(env, obj, lbuf, name, NULL);
523 dt_read_unlock(env, obj);
530 GOTO(out, rc = -ENOENT);
534 CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
535 tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
536 name, (int)lbuf->lb_len);
539 reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
543 static int out_index_lookup(struct tgt_session_info *tsi)
545 const struct lu_env *env = tsi->tsi_env;
546 struct tgt_thread_info *tti = tgt_th_info(env);
547 struct update *update = tti->tti_u.update.tti_update;
548 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
554 if (!lu_object_exists(&obj->do_lu))
557 name = (char *)update_param_buf(update, 0, NULL);
559 CERROR("%s: empty name for lookup: rc = %d\n",
560 tgt_name(tsi->tsi_tgt), -EPROTO);
561 RETURN(err_serious(-EPROTO));
564 dt_read_lock(env, obj, MOR_TGT_CHILD);
565 if (!dt_try_as_dir(env, obj))
566 GOTO(out_unlock, rc = -ENOTDIR);
568 rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
569 (struct dt_key *)name, NULL);
572 GOTO(out_unlock, rc);
577 CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
578 PFID(lu_object_fid(&obj->do_lu)), name,
579 PFID(&tti->tti_fid1), rc);
580 fid_cpu_to_le(&tti->tti_fid1, &tti->tti_fid1);
583 dt_read_unlock(env, obj);
585 CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
586 tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
589 update_insert_reply(tti->tti_u.update.tti_update_reply,
590 &tti->tti_fid1, sizeof(tti->tti_fid1), 0, rc);
594 static int out_tx_xattr_set_exec(const struct lu_env *env,
598 struct dt_object *dt_obj = arg->object;
601 CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
602 dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
603 arg->u.xattr_set.name, arg->u.xattr_set.flags);
605 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
606 rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
607 arg->u.xattr_set.name, arg->u.xattr_set.flags,
609 dt_write_unlock(env, dt_obj);
611 * Ignore errors if this is LINK EA
613 if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
616 CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
617 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
619 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
624 static int __out_tx_xattr_set(const struct lu_env *env,
625 struct dt_object *dt_obj,
626 const struct lu_buf *buf,
627 const char *name, int flags,
628 struct thandle_exec_args *ta,
629 struct update_reply *reply, int index,
630 char *file, int line)
634 LASSERT(ta->ta_handle != NULL);
635 ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
636 flags, ta->ta_handle);
640 arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
642 lu_object_get(&dt_obj->do_lu);
643 arg->object = dt_obj;
644 arg->u.xattr_set.name = name;
645 arg->u.xattr_set.flags = flags;
646 arg->u.xattr_set.buf = *buf;
649 arg->u.xattr_set.csum = 0;
653 static int out_xattr_set(struct tgt_session_info *tsi)
655 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
656 struct update *update = tti->tti_u.update.tti_update;
657 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
658 struct lu_buf *lbuf = &tti->tti_buf;
667 name = update_param_buf(update, 0, NULL);
669 CERROR("%s: empty name for xattr set: rc = %d\n",
670 tgt_name(tsi->tsi_tgt), -EPROTO);
671 RETURN(err_serious(-EPROTO));
674 buf = (char *)update_param_buf(update, 1, &buf_len);
675 if (buf == NULL || buf_len == 0) {
676 CERROR("%s: empty buf for xattr set: rc = %d\n",
677 tgt_name(tsi->tsi_tgt), -EPROTO);
678 RETURN(err_serious(-EPROTO));
682 lbuf->lb_len = buf_len;
684 tmp = (char *)update_param_buf(update, 2, NULL);
686 CERROR("%s: empty flag for xattr set: rc = %d\n",
687 tgt_name(tsi->tsi_tgt), -EPROTO);
688 RETURN(err_serious(-EPROTO));
691 flag = le32_to_cpu(*(int *)tmp);
693 rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
695 tti->tti_u.update.tti_update_reply,
696 tti->tti_u.update.tti_update_reply_index);
700 static int out_obj_ref_add(const struct lu_env *env,
701 struct dt_object *dt_obj,
706 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
707 rc = dt_ref_add(env, dt_obj, th);
708 dt_write_unlock(env, dt_obj);
713 static int out_obj_ref_del(const struct lu_env *env,
714 struct dt_object *dt_obj,
719 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
720 rc = dt_ref_del(env, dt_obj, th);
721 dt_write_unlock(env, dt_obj);
726 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
729 struct dt_object *dt_obj = arg->object;
732 rc = out_obj_ref_add(env, dt_obj, th);
734 CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
735 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
737 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
741 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
744 return out_obj_ref_del(env, arg->object, th);
747 static int __out_tx_ref_add(const struct lu_env *env,
748 struct dt_object *dt_obj,
749 struct thandle_exec_args *ta,
750 struct update_reply *reply,
751 int index, char *file, int line)
755 LASSERT(ta->ta_handle != NULL);
756 ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
760 arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
763 lu_object_get(&dt_obj->do_lu);
764 arg->object = dt_obj;
771 * increase ref of the object
773 static int out_ref_add(struct tgt_session_info *tsi)
775 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
776 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
781 rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
782 tti->tti_u.update.tti_update_reply,
783 tti->tti_u.update.tti_update_reply_index);
787 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
790 struct dt_object *dt_obj = arg->object;
793 rc = out_obj_ref_del(env, dt_obj, th);
795 CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
796 dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
798 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
803 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
806 return out_obj_ref_add(env, arg->object, th);
809 static int __out_tx_ref_del(const struct lu_env *env,
810 struct dt_object *dt_obj,
811 struct thandle_exec_args *ta,
812 struct update_reply *reply,
813 int index, char *file, int line)
817 LASSERT(ta->ta_handle != NULL);
818 ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
822 arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
825 lu_object_get(&dt_obj->do_lu);
826 arg->object = dt_obj;
832 static int out_ref_del(struct tgt_session_info *tsi)
834 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
835 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
840 if (!lu_object_exists(&obj->do_lu))
843 rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
844 tti->tti_u.update.tti_update_reply,
845 tti->tti_u.update.tti_update_reply_index);
849 static int out_obj_index_insert(const struct lu_env *env,
850 struct dt_object *dt_obj,
851 const struct dt_rec *rec,
852 const struct dt_key *key,
857 CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
858 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
859 (char *)key, PFID((struct lu_fid *)rec));
861 if (dt_try_as_dir(env, dt_obj) == 0)
864 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
865 rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
866 dt_write_unlock(env, dt_obj);
871 static int out_obj_index_delete(const struct lu_env *env,
872 struct dt_object *dt_obj,
873 const struct dt_key *key,
878 CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
879 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
882 if (dt_try_as_dir(env, dt_obj) == 0)
885 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
886 rc = dt_delete(env, dt_obj, key, th, NULL);
887 dt_write_unlock(env, dt_obj);
892 static int out_tx_index_insert_exec(const struct lu_env *env,
893 struct thandle *th, struct tx_arg *arg)
895 struct dt_object *dt_obj = arg->object;
898 rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
899 arg->u.insert.key, th);
901 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
902 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
904 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
909 static int out_tx_index_insert_undo(const struct lu_env *env,
910 struct thandle *th, struct tx_arg *arg)
912 return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
915 static int __out_tx_index_insert(const struct lu_env *env,
916 struct dt_object *dt_obj,
917 char *name, struct lu_fid *fid,
918 struct thandle_exec_args *ta,
919 struct update_reply *reply,
920 int index, char *file, int line)
924 LASSERT(ta->ta_handle != NULL);
926 if (lu_object_exists(&dt_obj->do_lu)) {
927 if (dt_try_as_dir(env, dt_obj) == 0) {
928 ta->ta_err = -ENOTDIR;
931 ta->ta_err = dt_declare_insert(env, dt_obj,
932 (struct dt_rec *)fid,
933 (struct dt_key *)name,
940 arg = tx_add_exec(ta, out_tx_index_insert_exec,
941 out_tx_index_insert_undo, file,
944 lu_object_get(&dt_obj->do_lu);
945 arg->object = dt_obj;
948 arg->u.insert.rec = (struct dt_rec *)fid;
949 arg->u.insert.key = (struct dt_key *)name;
954 static int out_index_insert(struct tgt_session_info *tsi)
956 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
957 struct update *update = tti->tti_u.update.tti_update;
958 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
966 name = (char *)update_param_buf(update, 0, NULL);
968 CERROR("%s: empty name for index insert: rc = %d\n",
969 tgt_name(tsi->tsi_tgt), -EPROTO);
970 RETURN(err_serious(-EPROTO));
973 fid = (struct lu_fid *)update_param_buf(update, 1, &size);
974 if (fid == NULL || size != sizeof(*fid)) {
975 CERROR("%s: invalid fid: rc = %d\n",
976 tgt_name(tsi->tsi_tgt), -EPROTO);
977 RETURN(err_serious(-EPROTO));
980 fid_le_to_cpu(fid, fid);
981 if (!fid_is_sane(fid)) {
982 CERROR("%s: invalid FID "DFID": rc = %d\n",
983 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
984 RETURN(err_serious(-EPROTO));
987 rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
989 tti->tti_u.update.tti_update_reply,
990 tti->tti_u.update.tti_update_reply_index);
994 static int out_tx_index_delete_exec(const struct lu_env *env,
1000 rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1002 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1003 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1005 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1010 static int out_tx_index_delete_undo(const struct lu_env *env,
1014 CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1015 dt_obd_name(th->th_dev), -ENOTSUPP);
1019 static int __out_tx_index_delete(const struct lu_env *env,
1020 struct dt_object *dt_obj, char *name,
1021 struct thandle_exec_args *ta,
1022 struct update_reply *reply,
1023 int index, char *file, int line)
1027 if (dt_try_as_dir(env, dt_obj) == 0) {
1028 ta->ta_err = -ENOTDIR;
1032 LASSERT(ta->ta_handle != NULL);
1033 ta->ta_err = dt_declare_delete(env, dt_obj,
1034 (struct dt_key *)name,
1036 if (ta->ta_err != 0)
1039 arg = tx_add_exec(ta, out_tx_index_delete_exec,
1040 out_tx_index_delete_undo, file,
1043 lu_object_get(&dt_obj->do_lu);
1044 arg->object = dt_obj;
1047 arg->u.insert.key = (struct dt_key *)name;
1051 static int out_index_delete(struct tgt_session_info *tsi)
1053 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
1054 struct update *update = tti->tti_u.update.tti_update;
1055 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
1059 if (!lu_object_exists(&obj->do_lu))
1062 name = (char *)update_param_buf(update, 0, NULL);
1064 CERROR("%s: empty name for index delete: rc = %d\n",
1065 tgt_name(tsi->tsi_tgt), -EPROTO);
1066 RETURN(err_serious(-EPROTO));
1069 rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1070 tti->tti_u.update.tti_update_reply,
1071 tti->tti_u.update.tti_update_reply_index);
1075 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1078 struct dt_object *dt_obj = arg->object;
1081 rc = out_obj_destroy(env, dt_obj, th);
1083 CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1084 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1086 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1091 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1094 CERROR("%s: not support destroy undo yet!: rc = %d\n",
1095 dt_obd_name(th->th_dev), -ENOTSUPP);
1099 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1100 struct thandle_exec_args *ta,
1101 struct update_reply *reply,
1102 int index, char *file, int line)
1106 LASSERT(ta->ta_handle != NULL);
1107 ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1111 arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1114 lu_object_get(&dt_obj->do_lu);
1115 arg->object = dt_obj;
1121 static int out_destroy(struct tgt_session_info *tsi)
1123 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
1124 struct update *update = tti->tti_u.update.tti_update;
1125 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
1130 fid = &update->u_fid;
1131 fid_le_to_cpu(fid, fid);
1132 if (!fid_is_sane(fid)) {
1133 CERROR("%s: invalid FID "DFID": rc = %d\n",
1134 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1135 RETURN(err_serious(-EPROTO));
1138 if (!lu_object_exists(&obj->do_lu))
1141 rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1142 tti->tti_u.update.tti_update_reply,
1143 tti->tti_u.update.tti_update_reply_index);
1148 #define DEF_OUT_HNDL(opc, name, flags, fn) \
1149 [opc - OBJ_CREATE] = { \
1153 .th_flags = flags, \
1159 #define out_handler mdt_handler
1160 static struct tgt_handler out_update_ops[] = {
1161 DEF_OUT_HNDL(OBJ_CREATE, "obj_create", MUTABOR | HABEO_REFERO,
1163 DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", MUTABOR | HABEO_REFERO,
1165 DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", MUTABOR | HABEO_REFERO,
1167 DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", MUTABOR | HABEO_REFERO,
1169 DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", MUTABOR | HABEO_REFERO,
1171 DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", HABEO_REFERO,
1173 DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", MUTABOR | HABEO_REFERO,
1175 DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", HABEO_REFERO,
1177 DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", HABEO_REFERO,
1179 DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert",
1180 MUTABOR | HABEO_REFERO, out_index_insert),
1181 DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete",
1182 MUTABOR | HABEO_REFERO, out_index_delete),
1185 struct tgt_handler *out_handler_find(__u32 opc)
1187 struct tgt_handler *h;
1190 if (OBJ_CREATE <= opc && opc < OBJ_LAST) {
1191 h = &out_update_ops[opc - OBJ_CREATE];
1192 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1195 h = NULL; /* unsupported opc */
1201 * Object updates between Targets. Because all the updates has been
1202 * dis-assemblied into object updates at sender side, so OUT will
1203 * call OSD API directly to execute these updates.
1205 * In DNE phase I all of the updates in the request need to be executed
1206 * in one transaction, and the transaction has to be synchronously.
1208 * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1211 int out_handle(struct tgt_session_info *tsi)
1213 const struct lu_env *env = tsi->tsi_env;
1214 struct tgt_thread_info *tti = tgt_th_info(env);
1215 struct thandle_exec_args *ta = &tti->tti_tea;
1216 struct req_capsule *pill = tsi->tsi_pill;
1217 struct dt_device *dt = tsi->tsi_tgt->lut_bottom;
1218 struct update_buf *ubuf;
1219 struct update *update;
1220 struct update_reply *update_reply;
1223 int old_batchid = -1;
1231 req_capsule_set(pill, &RQF_UPDATE_OBJ);
1232 bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1233 if (bufsize != UPDATE_BUFFER_SIZE) {
1234 CERROR("%s: invalid bufsize %d: rc = %d\n",
1235 tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1236 RETURN(err_serious(-EPROTO));
1239 ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1241 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1243 RETURN(err_serious(-EPROTO));
1246 if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
1247 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1248 tgt_name(tsi->tsi_tgt), le32_to_cpu(ubuf->ub_magic),
1249 UPDATE_BUFFER_MAGIC, -EPROTO);
1250 RETURN(err_serious(-EPROTO));
1253 count = le32_to_cpu(ubuf->ub_count);
1255 CERROR("%s: No update!: rc = %d\n",
1256 tgt_name(tsi->tsi_tgt), -EPROTO);
1257 RETURN(err_serious(-EPROTO));
1260 req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1261 UPDATE_BUFFER_SIZE);
1262 rc = req_capsule_server_pack(pill);
1264 CERROR("%s: Can't pack response: rc = %d\n",
1265 tgt_name(tsi->tsi_tgt), rc);
1269 /* Prepare the update reply buffer */
1270 update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1271 if (update_reply == NULL)
1272 RETURN(err_serious(-EPROTO));
1273 update_init_reply_buf(update_reply, count);
1274 tti->tti_u.update.tti_update_reply = update_reply;
1276 rc = out_tx_start(env, dt, ta);
1280 tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1282 /* Walk through updates in the request to execute them synchronously */
1283 off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1284 for (i = 0; i < count; i++) {
1285 struct tgt_handler *h;
1286 struct dt_object *dt_obj;
1288 update = (struct update *)((char *)ubuf + off);
1289 if (old_batchid == -1) {
1290 old_batchid = update->u_batchid;
1291 } else if (old_batchid != update->u_batchid) {
1292 /* Stop the current update transaction,
1293 * create a new one */
1294 rc = out_tx_end(env, ta);
1298 rc = out_tx_start(env, dt, ta);
1301 old_batchid = update->u_batchid;
1304 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1305 if (!fid_is_sane(&update->u_fid)) {
1306 CERROR("%s: invalid FID "DFID": rc = %d\n",
1307 tgt_name(tsi->tsi_tgt), PFID(&update->u_fid),
1309 GOTO(out, rc = err_serious(-EPROTO));
1312 dt_obj = dt_locate(env, dt, &update->u_fid);
1314 GOTO(out, rc = PTR_ERR(dt_obj));
1316 tti->tti_u.update.tti_dt_object = dt_obj;
1317 tti->tti_u.update.tti_update = update;
1318 tti->tti_u.update.tti_update_reply_index = i;
1320 h = out_handler_find(update->u_type);
1321 if (likely(h != NULL)) {
1322 /* For real modification RPC, check if the update
1323 * has been executed */
1324 if (h->th_flags & MUTABOR) {
1325 struct ptlrpc_request *req = tgt_ses_req(tsi);
1327 if (out_check_resent(env, dt, dt_obj, req,
1333 rc = h->th_act(tsi);
1335 CERROR("%s: The unsupported opc: 0x%x\n",
1336 tgt_name(tsi->tsi_tgt), update->u_type);
1337 lu_object_put(env, &dt_obj->do_lu);
1338 GOTO(out, rc = -ENOTSUPP);
1341 lu_object_put(env, &dt_obj->do_lu);
1344 off += cfs_size_round(update_size(update));
1347 rc1 = out_tx_end(env, ta);
1353 struct tgt_handler tgt_out_handlers[] = {
1354 TGT_UPDATE_HDL(MUTABOR, UPDATE_OBJ, out_handle),
1356 EXPORT_SYMBOL(tgt_out_handlers);