4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
25 * lustre/mdt/out_handler.c
27 * Object update handler between targets.
29 * Author: di.wang <di.wang@intel.com>
32 #define DEBUG_SUBSYSTEM S_CLASS
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40 tx_exec_func_t undo, char *file, int line)
47 LASSERTF(ta->ta_argno + 1 <= TX_MAX_OPS,
48 "Too many updates(%d) in one trans\n", ta->ta_argno);
53 ta->ta_args[i].exec_fn = func;
54 ta->ta_args[i].undo_fn = undo;
55 ta->ta_args[i].file = file;
56 ta->ta_args[i].line = line;
58 return &ta->ta_args[i];
61 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
62 struct dt_object *obj,
63 struct object_update_reply *reply,
66 CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
67 dt_obd_name(dt), reply, index, 0);
69 object_update_result_insert(reply, NULL, 0, index, 0);
73 typedef void (*out_reconstruct_t)(const struct lu_env *env,
75 struct dt_object *obj,
76 struct object_update_reply *reply,
79 static inline int out_check_resent(const struct lu_env *env,
81 struct dt_object *obj,
82 struct ptlrpc_request *req,
83 out_reconstruct_t reconstruct,
84 struct object_update_reply *reply,
87 if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
90 if (req_xid_is_last(req)) {
91 reconstruct(env, dt, obj, reply, index);
94 DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
95 req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
99 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
104 CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
105 PFID(lu_object_fid(&dt_obj->do_lu)));
107 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
108 rc = dt_destroy(env, dt_obj, th);
109 dt_write_unlock(env, dt_obj);
115 * All of the xxx_undo will be used once execution failed,
116 * But because all of the required resource has been reserved in
117 * declare phase, i.e. if declare succeed, it should make sure
118 * the following executing phase succeed in anyway, so these undo
119 * should be useless for most of the time in Phase I
121 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
126 rc = out_obj_destroy(env, arg->object, th);
128 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
129 dt_obd_name(th->th_dev), rc);
133 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
136 struct dt_object *dt_obj = arg->object;
139 CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
140 dt_obd_name(th->th_dev),
141 PFID(lu_object_fid(&arg->object->do_lu)),
142 arg->u.create.dof.dof_type,
143 arg->u.create.attr.la_mode & S_IFMT);
145 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
146 rc = dt_create(env, dt_obj, &arg->u.create.attr,
147 &arg->u.create.hint, &arg->u.create.dof, th);
149 dt_write_unlock(env, dt_obj);
151 CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
152 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
154 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
159 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
160 struct lu_attr *attr, struct lu_fid *parent_fid,
161 struct dt_object_format *dof,
162 struct thandle_exec_args *ta,
163 struct object_update_reply *reply,
164 int index, char *file, int line)
168 LASSERT(ta->ta_handle != NULL);
169 ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
174 arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
178 /* release the object in out_trans_stop */
179 lu_object_get(&obj->do_lu);
181 arg->u.create.attr = *attr;
182 if (parent_fid != NULL)
183 arg->u.create.fid = *parent_fid;
184 memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
185 arg->u.create.dof = *dof;
192 static int out_create(struct tgt_session_info *tsi)
194 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
195 struct object_update *update = tti->tti_u.update.tti_update;
196 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
197 struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
198 struct obdo *lobdo = &tti->tti_u.update.tti_obdo;
199 struct lu_attr *attr = &tti->tti_attr;
200 struct lu_fid *fid = NULL;
207 wobdo = object_update_param_get(update, 0, &size);
208 if (wobdo == NULL || size != sizeof(*wobdo)) {
209 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
210 tgt_name(tsi->tsi_tgt), -EPROTO);
211 RETURN(err_serious(-EPROTO));
214 obdo_le_to_cpu(wobdo, wobdo);
215 lustre_get_wire_obdo(NULL, lobdo, wobdo);
216 la_from_obdo(attr, lobdo, lobdo->o_valid);
218 dof->dof_type = dt_mode_to_dft(attr->la_mode);
219 if (update->ou_params_count > 1) {
222 fid = object_update_param_get(update, 1, &size);
223 if (fid == NULL || size != sizeof(*fid)) {
224 CERROR("%s: invalid fid: rc = %d\n",
225 tgt_name(tsi->tsi_tgt), -EPROTO);
226 RETURN(err_serious(-EPROTO));
228 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
229 lustre_swab_lu_fid(fid);
230 if (!fid_is_sane(fid)) {
231 CERROR("%s: invalid fid "DFID": rc = %d\n",
232 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
233 RETURN(err_serious(-EPROTO));
237 if (lu_object_exists(&obj->do_lu))
240 rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
242 tti->tti_u.update.tti_update_reply,
243 tti->tti_u.update.tti_update_reply_index);
248 static int out_tx_attr_set_undo(const struct lu_env *env,
249 struct thandle *th, struct tx_arg *arg)
251 CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
252 dt_obd_name(th->th_dev),
253 PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
258 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
261 struct dt_object *dt_obj = arg->object;
264 CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
265 PFID(lu_object_fid(&dt_obj->do_lu)));
267 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
268 rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
269 dt_write_unlock(env, dt_obj);
271 CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
272 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
274 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
279 static int __out_tx_attr_set(const struct lu_env *env,
280 struct dt_object *dt_obj,
281 const struct lu_attr *attr,
282 struct thandle_exec_args *th,
283 struct object_update_reply *reply,
284 int index, char *file, int line)
288 LASSERT(th->ta_handle != NULL);
289 th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
293 arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
296 lu_object_get(&dt_obj->do_lu);
297 arg->object = dt_obj;
298 arg->u.attr_set.attr = *attr;
304 static int out_attr_set(struct tgt_session_info *tsi)
306 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
307 struct object_update *update = tti->tti_u.update.tti_update;
308 struct lu_attr *attr = &tti->tti_attr;
309 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
310 struct obdo *lobdo = &tti->tti_u.update.tti_obdo;
317 wobdo = object_update_param_get(update, 0, &size);
318 if (wobdo == NULL || size != sizeof(*wobdo)) {
319 CERROR("%s: empty obdo in the update: rc = %d\n",
320 tgt_name(tsi->tsi_tgt), -EPROTO);
321 RETURN(err_serious(-EPROTO));
326 obdo_le_to_cpu(wobdo, wobdo);
327 lustre_get_wire_obdo(NULL, lobdo, wobdo);
328 la_from_obdo(attr, lobdo, lobdo->o_valid);
330 rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
331 tti->tti_u.update.tti_update_reply,
332 tti->tti_u.update.tti_update_reply_index);
337 static int out_attr_get(struct tgt_session_info *tsi)
339 const struct lu_env *env = tsi->tsi_env;
340 struct tgt_thread_info *tti = tgt_th_info(env);
341 struct obdo *obdo = &tti->tti_u.update.tti_obdo;
342 struct lu_attr *la = &tti->tti_attr;
343 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
344 int idx = tti->tti_u.update.tti_update_reply_index;
349 if (!lu_object_exists(&obj->do_lu)) {
350 /* Usually, this will be called when the master MDT try
351 * to init a remote object(see osp_object_init), so if
352 * the object does not exist on slave, we need set BANSHEE flag,
353 * so the object can be removed from the cache immediately */
354 set_bit(LU_OBJECT_HEARD_BANSHEE,
355 &obj->do_lu.lo_header->loh_flags);
359 dt_read_lock(env, obj, MOR_TGT_CHILD);
360 rc = dt_attr_get(env, obj, la, NULL);
362 GOTO(out_unlock, rc);
364 * If it is a directory, we will also check whether the
365 * directory is empty.
366 * la_flags = 0 : Empty.
370 if (S_ISDIR(la->la_mode)) {
372 const struct dt_it_ops *iops;
374 if (!dt_try_as_dir(env, obj))
375 GOTO(out_unlock, rc = -ENOTDIR);
377 iops = &obj->do_index_ops->dio_it;
378 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
381 result = iops->get(env, it, (const void *)"");
384 for (result = 0, i = 0; result == 0 && i < 3;
386 result = iops->next(env, it);
389 } else if (result == 0)
391 * Huh? Index contains no zero key?
401 obdo_from_la(obdo, la, la->la_valid);
402 obdo_cpu_to_le(obdo, obdo);
403 lustre_set_wire_obdo(NULL, obdo, obdo);
406 dt_read_unlock(env, obj);
408 CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
409 tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
412 object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
413 sizeof(*obdo), idx, rc);
418 static int out_xattr_get(struct tgt_session_info *tsi)
420 const struct lu_env *env = tsi->tsi_env;
421 struct tgt_thread_info *tti = tgt_th_info(env);
422 struct object_update *update = tti->tti_u.update.tti_update;
423 struct lu_buf *lbuf = &tti->tti_buf;
424 struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
425 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
427 struct object_update_result *update_result;
428 int idx = tti->tti_u.update.tti_update_reply_index;
433 name = object_update_param_get(update, 0, NULL);
435 CERROR("%s: empty name for xattr get: rc = %d\n",
436 tgt_name(tsi->tsi_tgt), -EPROTO);
437 RETURN(err_serious(-EPROTO));
440 update_result = object_update_result_get(reply, 0, NULL);
441 if (update_result == NULL) {
442 CERROR("%s: empty name for xattr get: rc = %d\n",
443 tgt_name(tsi->tsi_tgt), -EPROTO);
444 RETURN(err_serious(-EPROTO));
447 lbuf->lb_buf = update_result->our_data;
448 lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
449 cfs_size_round((unsigned long)update_result->our_data -
450 (unsigned long)update_result);
451 dt_read_lock(env, obj, MOR_TGT_CHILD);
452 rc = dt_xattr_get(env, obj, lbuf, name, NULL);
453 dt_read_unlock(env, obj);
460 GOTO(out, rc = -ENOENT);
464 CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
465 tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
466 name, (int)lbuf->lb_len);
471 object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
475 static int out_index_lookup(struct tgt_session_info *tsi)
477 const struct lu_env *env = tsi->tsi_env;
478 struct tgt_thread_info *tti = tgt_th_info(env);
479 struct object_update *update = tti->tti_u.update.tti_update;
480 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
486 if (!lu_object_exists(&obj->do_lu))
489 name = object_update_param_get(update, 0, NULL);
491 CERROR("%s: empty name for lookup: rc = %d\n",
492 tgt_name(tsi->tsi_tgt), -EPROTO);
493 RETURN(err_serious(-EPROTO));
496 dt_read_lock(env, obj, MOR_TGT_CHILD);
497 if (!dt_try_as_dir(env, obj))
498 GOTO(out_unlock, rc = -ENOTDIR);
500 rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
501 (struct dt_key *)name, NULL);
504 GOTO(out_unlock, rc);
509 CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
510 PFID(lu_object_fid(&obj->do_lu)), name,
511 PFID(&tti->tti_fid1), rc);
514 dt_read_unlock(env, obj);
516 CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
517 tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
520 object_update_result_insert(tti->tti_u.update.tti_update_reply,
521 &tti->tti_fid1, sizeof(tti->tti_fid1),
522 tti->tti_u.update.tti_update_reply_index, rc);
526 static int out_tx_xattr_set_exec(const struct lu_env *env,
530 struct dt_object *dt_obj = arg->object;
533 CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
534 dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
535 arg->u.xattr_set.name, arg->u.xattr_set.flags);
537 if (!lu_object_exists(&dt_obj->do_lu))
538 GOTO(out, rc = -ENOENT);
540 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
541 rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
542 arg->u.xattr_set.name, arg->u.xattr_set.flags,
544 dt_write_unlock(env, dt_obj);
546 * Ignore errors if this is LINK EA
548 if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
551 CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
552 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
554 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
559 static int __out_tx_xattr_set(const struct lu_env *env,
560 struct dt_object *dt_obj,
561 const struct lu_buf *buf,
562 const char *name, int flags,
563 struct thandle_exec_args *ta,
564 struct object_update_reply *reply,
565 int index, char *file, int line)
569 LASSERT(ta->ta_handle != NULL);
570 ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
571 flags, ta->ta_handle);
575 arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
577 lu_object_get(&dt_obj->do_lu);
578 arg->object = dt_obj;
579 arg->u.xattr_set.name = name;
580 arg->u.xattr_set.flags = flags;
581 arg->u.xattr_set.buf = *buf;
584 arg->u.xattr_set.csum = 0;
588 static int out_xattr_set(struct tgt_session_info *tsi)
590 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
591 struct object_update *update = tti->tti_u.update.tti_update;
592 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
593 struct lu_buf *lbuf = &tti->tti_buf;
602 name = object_update_param_get(update, 0, NULL);
604 CERROR("%s: empty name for xattr set: rc = %d\n",
605 tgt_name(tsi->tsi_tgt), -EPROTO);
606 RETURN(err_serious(-EPROTO));
609 buf = object_update_param_get(update, 1, &buf_len);
610 if (buf == NULL || buf_len == 0) {
611 CERROR("%s: empty buf for xattr set: rc = %d\n",
612 tgt_name(tsi->tsi_tgt), -EPROTO);
613 RETURN(err_serious(-EPROTO));
617 lbuf->lb_len = buf_len;
619 tmp = (char *)object_update_param_get(update, 2, NULL);
621 CERROR("%s: empty flag for xattr set: rc = %d\n",
622 tgt_name(tsi->tsi_tgt), -EPROTO);
623 RETURN(err_serious(-EPROTO));
626 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
627 __swab32s((__u32 *)tmp);
630 rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
632 tti->tti_u.update.tti_update_reply,
633 tti->tti_u.update.tti_update_reply_index);
637 static int out_obj_ref_add(const struct lu_env *env,
638 struct dt_object *dt_obj,
643 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
644 rc = dt_ref_add(env, dt_obj, th);
645 dt_write_unlock(env, dt_obj);
650 static int out_obj_ref_del(const struct lu_env *env,
651 struct dt_object *dt_obj,
656 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
657 rc = dt_ref_del(env, dt_obj, th);
658 dt_write_unlock(env, dt_obj);
663 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
666 struct dt_object *dt_obj = arg->object;
669 rc = out_obj_ref_add(env, dt_obj, th);
671 CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
672 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
674 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
678 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
681 return out_obj_ref_del(env, arg->object, th);
684 static int __out_tx_ref_add(const struct lu_env *env,
685 struct dt_object *dt_obj,
686 struct thandle_exec_args *ta,
687 struct object_update_reply *reply,
688 int index, char *file, int line)
692 LASSERT(ta->ta_handle != NULL);
693 ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
697 arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
700 lu_object_get(&dt_obj->do_lu);
701 arg->object = dt_obj;
708 * increase ref of the object
710 static int out_ref_add(struct tgt_session_info *tsi)
712 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
713 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
718 rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
719 tti->tti_u.update.tti_update_reply,
720 tti->tti_u.update.tti_update_reply_index);
724 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
727 struct dt_object *dt_obj = arg->object;
730 rc = out_obj_ref_del(env, dt_obj, th);
732 CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
733 dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
735 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
740 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
743 return out_obj_ref_add(env, arg->object, th);
746 static int __out_tx_ref_del(const struct lu_env *env,
747 struct dt_object *dt_obj,
748 struct thandle_exec_args *ta,
749 struct object_update_reply *reply,
750 int index, char *file, int line)
754 LASSERT(ta->ta_handle != NULL);
755 ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
759 arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
762 lu_object_get(&dt_obj->do_lu);
763 arg->object = dt_obj;
769 static int out_ref_del(struct tgt_session_info *tsi)
771 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
772 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
777 if (!lu_object_exists(&obj->do_lu))
780 rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
781 tti->tti_u.update.tti_update_reply,
782 tti->tti_u.update.tti_update_reply_index);
786 static int out_obj_index_insert(const struct lu_env *env,
787 struct dt_object *dt_obj,
788 const struct dt_rec *rec,
789 const struct dt_key *key,
794 CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
795 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
796 (char *)key, PFID((struct lu_fid *)rec));
798 if (dt_try_as_dir(env, dt_obj) == 0)
801 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
802 rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
803 dt_write_unlock(env, dt_obj);
808 static int out_obj_index_delete(const struct lu_env *env,
809 struct dt_object *dt_obj,
810 const struct dt_key *key,
815 CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
816 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
819 if (dt_try_as_dir(env, dt_obj) == 0)
822 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
823 rc = dt_delete(env, dt_obj, key, th, NULL);
824 dt_write_unlock(env, dt_obj);
829 static int out_tx_index_insert_exec(const struct lu_env *env,
830 struct thandle *th, struct tx_arg *arg)
832 struct dt_object *dt_obj = arg->object;
835 rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
836 arg->u.insert.key, th);
838 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
839 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
841 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
846 static int out_tx_index_insert_undo(const struct lu_env *env,
847 struct thandle *th, struct tx_arg *arg)
849 return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
852 static int __out_tx_index_insert(const struct lu_env *env,
853 struct dt_object *dt_obj,
854 char *name, struct lu_fid *fid,
855 struct thandle_exec_args *ta,
856 struct object_update_reply *reply,
857 int index, char *file, int line)
861 LASSERT(ta->ta_handle != NULL);
863 if (dt_try_as_dir(env, dt_obj) == 0) {
864 ta->ta_err = -ENOTDIR;
868 ta->ta_err = dt_declare_insert(env, dt_obj,
869 (struct dt_rec *)fid,
870 (struct dt_key *)name,
876 arg = tx_add_exec(ta, out_tx_index_insert_exec,
877 out_tx_index_insert_undo, file,
880 lu_object_get(&dt_obj->do_lu);
881 arg->object = dt_obj;
884 arg->u.insert.rec = (struct dt_rec *)fid;
885 arg->u.insert.key = (struct dt_key *)name;
890 static int out_index_insert(struct tgt_session_info *tsi)
892 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
893 struct object_update *update = tti->tti_u.update.tti_update;
894 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
902 name = object_update_param_get(update, 0, NULL);
904 CERROR("%s: empty name for index insert: rc = %d\n",
905 tgt_name(tsi->tsi_tgt), -EPROTO);
906 RETURN(err_serious(-EPROTO));
909 fid = object_update_param_get(update, 1, &size);
910 if (fid == NULL || size != sizeof(*fid)) {
911 CERROR("%s: invalid fid: rc = %d\n",
912 tgt_name(tsi->tsi_tgt), -EPROTO);
913 RETURN(err_serious(-EPROTO));
916 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
917 lustre_swab_lu_fid(fid);
919 if (!fid_is_sane(fid)) {
920 CERROR("%s: invalid FID "DFID": rc = %d\n",
921 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
922 RETURN(err_serious(-EPROTO));
925 rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
927 tti->tti_u.update.tti_update_reply,
928 tti->tti_u.update.tti_update_reply_index);
932 static int out_tx_index_delete_exec(const struct lu_env *env,
938 rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
940 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
941 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
943 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
948 static int out_tx_index_delete_undo(const struct lu_env *env,
952 CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
953 dt_obd_name(th->th_dev), -ENOTSUPP);
957 static int __out_tx_index_delete(const struct lu_env *env,
958 struct dt_object *dt_obj, char *name,
959 struct thandle_exec_args *ta,
960 struct object_update_reply *reply,
961 int index, char *file, int line)
965 if (dt_try_as_dir(env, dt_obj) == 0) {
966 ta->ta_err = -ENOTDIR;
970 LASSERT(ta->ta_handle != NULL);
971 ta->ta_err = dt_declare_delete(env, dt_obj,
972 (struct dt_key *)name,
977 arg = tx_add_exec(ta, out_tx_index_delete_exec,
978 out_tx_index_delete_undo, file,
981 lu_object_get(&dt_obj->do_lu);
982 arg->object = dt_obj;
985 arg->u.insert.key = (struct dt_key *)name;
989 static int out_index_delete(struct tgt_session_info *tsi)
991 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
992 struct object_update *update = tti->tti_u.update.tti_update;
993 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
997 if (!lu_object_exists(&obj->do_lu))
1000 name = object_update_param_get(update, 0, NULL);
1002 CERROR("%s: empty name for index delete: rc = %d\n",
1003 tgt_name(tsi->tsi_tgt), -EPROTO);
1004 RETURN(err_serious(-EPROTO));
1007 rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1008 tti->tti_u.update.tti_update_reply,
1009 tti->tti_u.update.tti_update_reply_index);
1013 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1016 struct dt_object *dt_obj = arg->object;
1019 rc = out_obj_destroy(env, dt_obj, th);
1021 CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1022 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1024 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1029 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1032 CERROR("%s: not support destroy undo yet!: rc = %d\n",
1033 dt_obd_name(th->th_dev), -ENOTSUPP);
1037 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1038 struct thandle_exec_args *ta,
1039 struct object_update_reply *reply,
1040 int index, char *file, int line)
1044 LASSERT(ta->ta_handle != NULL);
1045 ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1049 arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1052 lu_object_get(&dt_obj->do_lu);
1053 arg->object = dt_obj;
1059 static int out_destroy(struct tgt_session_info *tsi)
1061 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
1062 struct object_update *update = tti->tti_u.update.tti_update;
1063 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
1068 fid = &update->ou_fid;
1069 if (!fid_is_sane(fid)) {
1070 CERROR("%s: invalid FID "DFID": rc = %d\n",
1071 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1072 RETURN(err_serious(-EPROTO));
1075 if (!lu_object_exists(&obj->do_lu))
1078 rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1079 tti->tti_u.update.tti_update_reply,
1080 tti->tti_u.update.tti_update_reply_index);
1085 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
1088 struct dt_object *dt_obj = arg->object;
1091 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1092 rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
1093 &arg->u.write.pos, th);
1094 dt_write_unlock(env, dt_obj);
1097 rc = arg->u.write.buf.lb_len;
1099 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1101 return rc > 0 ? 0 : rc;
1104 static int __out_tx_write(const struct lu_env *env,
1105 struct dt_object *dt_obj,
1106 const struct lu_buf *buf,
1107 loff_t pos, struct thandle_exec_args *ta,
1108 struct object_update_reply *reply,
1109 int index, char *file, int line)
1113 LASSERT(ta->ta_handle != NULL);
1114 ta->ta_err = dt_declare_record_write(env, dt_obj, buf, pos,
1116 if (ta->ta_err != 0)
1119 arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
1121 lu_object_get(&dt_obj->do_lu);
1122 arg->object = dt_obj;
1123 arg->u.write.buf = *buf;
1124 arg->u.write.pos = pos;
1130 static int out_write(struct tgt_session_info *tsi)
1132 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
1133 struct object_update *update = tti->tti_u.update.tti_update;
1134 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
1135 struct lu_buf *lbuf = &tti->tti_buf;
1143 buf = object_update_param_get(update, 0, &buf_len);
1144 if (buf == NULL || buf_len == 0) {
1145 CERROR("%s: empty buf for xattr set: rc = %d\n",
1146 tgt_name(tsi->tsi_tgt), -EPROTO);
1147 RETURN(err_serious(-EPROTO));
1150 lbuf->lb_len = buf_len;
1152 tmp = (char *)object_update_param_get(update, 1, NULL);
1154 CERROR("%s: empty flag for xattr set: rc = %d\n",
1155 tgt_name(tsi->tsi_tgt), -EPROTO);
1156 RETURN(err_serious(-EPROTO));
1159 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1160 __swab64s((__u64 *)tmp);
1161 pos = *(loff_t *)tmp;
1163 rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
1165 tti->tti_u.update.tti_update_reply,
1166 tti->tti_u.update.tti_update_reply_index);
1170 #define DEF_OUT_HNDL(opc, name, flags, fn) \
1171 [opc - OUT_CREATE] = { \
1175 .th_flags = flags, \
1181 #define out_handler mdt_handler
1182 static struct tgt_handler out_update_ops[] = {
1183 DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1185 DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1187 DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1189 DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1191 DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set", MUTABOR | HABEO_REFERO,
1193 DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get", HABEO_REFERO,
1195 DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1197 DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1199 DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1201 DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1202 MUTABOR | HABEO_REFERO, out_index_insert),
1203 DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1204 MUTABOR | HABEO_REFERO, out_index_delete),
1205 DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
1208 struct tgt_handler *out_handler_find(__u32 opc)
1210 struct tgt_handler *h;
1213 if (OUT_CREATE <= opc && opc < OUT_LAST) {
1214 h = &out_update_ops[opc - OUT_CREATE];
1215 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1218 h = NULL; /* unsupported opc */
1223 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
1224 struct thandle_exec_args *ta, struct obd_export *exp)
1226 memset(ta, 0, sizeof(*ta));
1227 ta->ta_handle = dt_trans_create(env, dt);
1228 if (IS_ERR(ta->ta_handle)) {
1231 rc = PTR_ERR(ta->ta_handle);
1232 ta->ta_handle = NULL;
1233 CERROR("%s: start handle error: rc = %d\n",
1234 dt_obd_name(dt), rc);
1238 if (exp->exp_need_sync)
1239 ta->ta_handle->th_sync = 1;
1244 static int out_trans_start(const struct lu_env *env,
1245 struct thandle_exec_args *ta)
1247 return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
1250 static int out_trans_stop(const struct lu_env *env,
1251 struct thandle_exec_args *ta, int err)
1256 ta->ta_handle->th_result = err;
1257 rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
1258 for (i = 0; i < ta->ta_argno; i++) {
1259 if (ta->ta_args[i].object != NULL) {
1260 struct dt_object *obj = ta->ta_args[i].object;
1262 /* If the object is being created during this
1263 * transaction, we need to remove them from the
1264 * cache immediately, because a few layers are
1265 * missing in OUT handler, i.e. the object might
1266 * not be initialized in all layers */
1267 if (ta->ta_args[i].exec_fn == out_tx_create_exec)
1268 set_bit(LU_OBJECT_HEARD_BANSHEE,
1269 &obj->do_lu.lo_header->loh_flags);
1270 lu_object_put(env, &ta->ta_args[i].object->do_lu);
1271 ta->ta_args[i].object = NULL;
1278 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
1280 struct tgt_session_info *tsi = tgt_ses_info(env);
1283 LASSERT(ta->ta_dev);
1284 LASSERT(ta->ta_handle);
1286 if (ta->ta_err != 0 || ta->ta_argno == 0)
1287 GOTO(stop, rc = ta->ta_err);
1289 rc = out_trans_start(env, ta);
1293 for (i = 0; i < ta->ta_argno; i++) {
1294 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
1296 if (unlikely(rc != 0)) {
1297 CDEBUG(D_INFO, "error during execution of #%u from"
1298 " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
1299 ta->ta_args[i].line, rc);
1301 if (ta->ta_args[i].undo_fn != NULL)
1302 ta->ta_args[i].undo_fn(env,
1306 CERROR("%s: undo for %s:%d: rc = %d\n",
1307 dt_obd_name(ta->ta_dev),
1308 ta->ta_args[i].file,
1309 ta->ta_args[i].line, -ENOTSUPP);
1315 /* Only fail for real update */
1316 tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
1318 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
1319 dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
1320 out_trans_stop(env, ta, rc);
1321 ta->ta_handle = NULL;
1329 * Object updates between Targets. Because all the updates has been
1330 * dis-assemblied into object updates at sender side, so OUT will
1331 * call OSD API directly to execute these updates.
1333 * In DNE phase I all of the updates in the request need to be executed
1334 * in one transaction, and the transaction has to be synchronously.
1336 * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1339 int out_handle(struct tgt_session_info *tsi)
1341 const struct lu_env *env = tsi->tsi_env;
1342 struct tgt_thread_info *tti = tgt_th_info(env);
1343 struct thandle_exec_args *ta = &tti->tti_tea;
1344 struct req_capsule *pill = tsi->tsi_pill;
1345 struct dt_device *dt = tsi->tsi_tgt->lut_bottom;
1346 struct object_update_request *ureq;
1347 struct object_update *update;
1348 struct object_update_reply *reply;
1351 int old_batchid = -1;
1358 req_capsule_set(pill, &RQF_OUT_UPDATE);
1359 ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1361 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1363 RETURN(err_serious(-EPROTO));
1366 bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1367 if (bufsize != object_update_request_size(ureq)) {
1368 CERROR("%s: invalid bufsize %d: rc = %d\n",
1369 tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1370 RETURN(err_serious(-EPROTO));
1373 if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1374 CERROR("%s: invalid update buffer magic %x expect %x: "
1375 "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1376 UPDATE_REQUEST_MAGIC, -EPROTO);
1377 RETURN(err_serious(-EPROTO));
1380 count = ureq->ourq_count;
1382 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1384 RETURN(err_serious(-EPROTO));
1387 req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1388 OUT_UPDATE_REPLY_SIZE);
1389 rc = req_capsule_server_pack(pill);
1391 CERROR("%s: Can't pack response: rc = %d\n",
1392 tgt_name(tsi->tsi_tgt), rc);
1396 /* Prepare the update reply buffer */
1397 reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1399 RETURN(err_serious(-EPROTO));
1400 object_update_reply_init(reply, count);
1401 tti->tti_u.update.tti_update_reply = reply;
1403 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1407 tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1409 /* Walk through updates in the request to execute them synchronously */
1410 for (i = 0; i < count; i++) {
1411 struct tgt_handler *h;
1412 struct dt_object *dt_obj;
1414 update = object_update_request_get(ureq, i, NULL);
1416 GOTO(out, rc = -EPROTO);
1418 if (ptlrpc_req_need_swab(pill->rc_req))
1419 lustre_swab_object_update(update);
1421 if (old_batchid == -1) {
1422 old_batchid = update->ou_batchid;
1423 } else if (old_batchid != update->ou_batchid) {
1424 /* Stop the current update transaction,
1425 * create a new one */
1426 rc = out_tx_end(env, ta);
1430 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1433 old_batchid = update->ou_batchid;
1436 if (!fid_is_sane(&update->ou_fid)) {
1437 CERROR("%s: invalid FID "DFID": rc = %d\n",
1438 tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1440 GOTO(out, rc = err_serious(-EPROTO));
1443 dt_obj = dt_locate(env, dt, &update->ou_fid);
1445 GOTO(out, rc = PTR_ERR(dt_obj));
1447 if (dt->dd_record_fid_accessed) {
1448 lfsck_pack_rfa(&tti->tti_lr,
1449 lu_object_fid(&dt_obj->do_lu));
1450 tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1453 tti->tti_u.update.tti_dt_object = dt_obj;
1454 tti->tti_u.update.tti_update = update;
1455 tti->tti_u.update.tti_update_reply_index = i;
1457 h = out_handler_find(update->ou_type);
1458 if (likely(h != NULL)) {
1459 /* For real modification RPC, check if the update
1460 * has been executed */
1461 if (h->th_flags & MUTABOR) {
1462 struct ptlrpc_request *req = tgt_ses_req(tsi);
1464 if (out_check_resent(env, dt, dt_obj, req,
1465 out_reconstruct, reply, i))
1469 rc = h->th_act(tsi);
1471 CERROR("%s: The unsupported opc: 0x%x\n",
1472 tgt_name(tsi->tsi_tgt), update->ou_type);
1473 lu_object_put(env, &dt_obj->do_lu);
1474 GOTO(out, rc = -ENOTSUPP);
1477 lu_object_put(env, &dt_obj->do_lu);
1482 rc1 = out_tx_end(env, ta);
1488 struct tgt_handler tgt_out_handlers[] = {
1489 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE, out_handle),
1491 EXPORT_SYMBOL(tgt_out_handlers);