4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
26 * This file is part of Lustre, http://www.lustre.org/
27 * Lustre is a trademark of Sun Microsystems, Inc.
29 * lustre/mdt/out_handler.c
31 * Object update handler between targets.
33 * Author: di.wang <di.wang@intel.com>
37 # define EXPORT_SYMTAB
39 #define DEBUG_SUBSYSTEM S_MDS
41 #include "mdt_internal.h"
42 #include <lustre_update.h>
44 static const char dot[] = ".";
45 static const char dotdot[] = "..";
47 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
48 tx_exec_func_t undo, char *file, int line)
56 LASSERT(i < UPDATE_MAX_OPS);
60 ta->ta_args[i].exec_fn = func;
61 ta->ta_args[i].undo_fn = undo;
62 ta->ta_args[i].file = file;
63 ta->ta_args[i].line = line;
65 return &ta->ta_args[i];
68 static int out_tx_start(const struct lu_env *env, struct mdt_device *mdt,
69 struct thandle_exec_args *th)
71 struct dt_device *dt = mdt->mdt_bottom;
73 memset(th, 0, sizeof(*th));
74 th->ta_handle = dt_trans_create(env, dt);
75 if (IS_ERR(th->ta_handle)) {
76 CERROR("%s: start handle error: rc = %ld\n",
77 mdt2obd_dev(mdt)->obd_name, PTR_ERR(th->ta_handle));
78 return PTR_ERR(th->ta_handle);
81 /*For phase I, sync for cross-ref operation*/
82 th->ta_handle->th_sync = 1;
86 static int out_trans_start(const struct lu_env *env, struct dt_device *dt,
89 /* Always do sync commit for Phase I */
90 LASSERT(th->th_sync != 0);
91 return dt_trans_start(env, dt, th);
94 static int out_trans_stop(const struct lu_env *env,
95 struct thandle_exec_args *th, int err)
100 th->ta_handle->th_result = err;
101 LASSERT(th->ta_handle->th_sync != 0);
102 rc = dt_trans_stop(env, th->ta_dev, th->ta_handle);
103 for (i = 0; i < th->ta_argno; i++) {
104 if (th->ta_args[i].object != NULL) {
105 lu_object_put(env, &th->ta_args[i].object->do_lu);
106 th->ta_args[i].object = NULL;
113 int out_tx_end(struct mdt_thread_info *info, struct thandle_exec_args *th)
115 struct thandle_exec_args *_th = &info->mti_handle;
120 LASSERT(th->ta_handle);
122 if (th->ta_err != 0 || th->ta_argno == 0)
123 GOTO(stop, rc = th->ta_err);
125 rc = out_trans_start(info->mti_env, th->ta_dev, th->ta_handle);
129 for (i = 0; i < th->ta_argno; i++) {
130 rc = th->ta_args[i].exec_fn(info, th->ta_handle,
133 CDEBUG(D_INFO, "error during execution of #%u from"
134 " %s:%d: rc = %d\n", i, th->ta_args[i].file,
135 th->ta_args[i].line, rc);
137 LASSERTF(th->ta_args[i].undo_fn != NULL,
138 "can't undo changes, hope for failover!\n");
139 th->ta_args[i].undo_fn(info, th->ta_handle,
146 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
147 mdt_obd_name(info->mti_mdt), i, th->ta_argno, rc);
148 out_trans_stop(info->mti_env, th, rc);
149 th->ta_handle = NULL;
156 static struct dt_object *out_get_dt_obj(struct lu_object *obj)
158 struct mdt_device *mdt;
159 struct dt_device *dt;
160 struct lu_object *bottom_obj;
162 mdt = lu2mdt_dev(obj->lo_dev);
163 dt = mdt->mdt_bottom;
165 bottom_obj = lu_object_locate(obj->lo_header, dt->dd_lu_dev.ld_type);
166 if (bottom_obj == NULL)
167 return ERR_PTR(-ENOENT);
169 return lu2dt_obj(bottom_obj);
172 static struct dt_object *out_object_find(struct mdt_thread_info *info,
175 struct lu_object *obj;
176 struct dt_object *dt_obj;
178 obj = lu_object_find(info->mti_env,
179 &info->mti_mdt->mdt_md_dev.md_lu_dev,
182 return (struct dt_object *)obj;
184 dt_obj = out_get_dt_obj(obj);
185 if (IS_ERR(dt_obj)) {
186 lu_object_put(info->mti_env, obj);
187 return ERR_PTR(-ENOENT);
194 * All of the xxx_undo will be used once execution failed,
195 * But because all of the required resource has been reserved in
196 * declare phase, i.e. if declare succeed, it should make sure
197 * the following executing phase succeed in anyway, so these undo
198 * should be useless for most of the time in Phase I
200 int out_tx_create_undo(struct mdt_thread_info *info, struct thandle *th,
203 struct dt_object *dt_obj = arg->object;
206 LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
208 dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
209 rc = dt_destroy(info->mti_env, dt_obj, th);
210 dt_write_unlock(info->mti_env, dt_obj);
212 /* we don't like double failures */
214 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
215 mdt_obd_name(info->mti_mdt), rc);
219 int out_tx_create_exec(struct mdt_thread_info *info, struct thandle *th,
222 struct dt_object *dt_obj = arg->object;
225 LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
227 CDEBUG(D_OTHER, "create "DFID": dof %u, mode %o\n",
228 PFID(lu_object_fid(&arg->object->do_lu)),
229 arg->u.create.dof.dof_type,
230 arg->u.create.attr.la_mode & S_IFMT);
232 dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
233 rc = dt_create(info->mti_env, dt_obj, &arg->u.create.attr,
234 &arg->u.create.hint, &arg->u.create.dof, th);
236 dt_write_unlock(info->mti_env, dt_obj);
237 CDEBUG(D_INFO, "insert create reply mode %o index %d\n",
238 arg->u.create.attr.la_mode, arg->index);
240 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
245 static int __out_tx_create(struct mdt_thread_info *info, struct dt_object *obj,
246 struct lu_attr *attr, struct lu_fid *parent_fid,
247 struct dt_object_format *dof,
248 struct thandle_exec_args *th,
249 struct update_reply *reply,
250 int index, char *file, int line)
254 LASSERT(th->ta_handle != NULL);
255 th->ta_err = dt_declare_create(info->mti_env, obj, attr, NULL, dof,
260 arg = tx_add_exec(th, out_tx_create_exec, out_tx_create_undo, file,
264 /* release the object in out_trans_stop */
265 lu_object_get(&obj->do_lu);
267 arg->u.create.attr = *attr;
269 arg->u.create.fid = *parent_fid;
270 memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
271 arg->u.create.dof = *dof;
278 static int out_create(struct mdt_thread_info *info)
280 struct update *update = info->mti_u.update.mti_update;
281 struct dt_object *obj = info->mti_u.update.mti_dt_object;
282 struct dt_object_format *dof = &info->mti_u.update.mti_update_dof;
283 struct obdo *lobdo = &info->mti_u.update.mti_obdo;
284 struct lu_attr *attr = &info->mti_attr.ma_attr;
285 struct lu_fid *fid = NULL;
292 wobdo = update_param_buf(update, 0, &size);
293 if (wobdo == NULL || size != sizeof(*wobdo)) {
294 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
295 mdt_obd_name(info->mti_mdt), -EPROTO);
296 RETURN(err_serious(-EPROTO));
299 obdo_le_to_cpu(wobdo, wobdo);
300 lustre_get_wire_obdo(lobdo, wobdo);
301 la_from_obdo(attr, lobdo, lobdo->o_valid);
303 dof->dof_type = dt_mode_to_dft(attr->la_mode);
304 if (S_ISDIR(attr->la_mode)) {
307 fid = update_param_buf(update, 1, &size);
308 if (fid == NULL || size != sizeof(*fid)) {
309 CERROR("%s: invalid fid: rc = %d\n",
310 mdt_obd_name(info->mti_mdt), -EPROTO);
311 RETURN(err_serious(-EPROTO));
313 fid_le_to_cpu(fid, fid);
314 if (!fid_is_sane(fid)) {
315 CERROR("%s: invalid fid "DFID": rc = %d\n",
316 mdt_obd_name(info->mti_mdt),
318 RETURN(err_serious(-EPROTO));
322 rc = out_tx_create(info, obj, attr, fid, dof, &info->mti_handle,
323 info->mti_u.update.mti_update_reply,
324 info->mti_u.update.mti_update_reply_index);
329 static int out_tx_attr_set_undo(struct mdt_thread_info *info,
330 struct thandle *th, struct tx_arg *arg)
332 CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
333 mdt_obd_name(info->mti_mdt),
334 PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
339 static int out_tx_attr_set_exec(struct mdt_thread_info *info,
340 struct thandle *th, struct tx_arg *arg)
342 struct dt_object *dt_obj = arg->object;
345 LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
347 CDEBUG(D_OTHER, "attr set "DFID"\n",
348 PFID(lu_object_fid(&arg->object->do_lu)));
350 dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
351 rc = dt_attr_set(info->mti_env, dt_obj, &arg->u.attr_set.attr,
353 dt_write_unlock(info->mti_env, dt_obj);
355 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
360 static int __out_tx_attr_set(struct mdt_thread_info *info,
361 struct dt_object *dt_obj,
362 const struct lu_attr *attr,
363 struct thandle_exec_args *th,
364 struct update_reply *reply, int index,
365 char *file, int line)
369 LASSERT(th->ta_handle != NULL);
370 th->ta_err = dt_declare_attr_set(info->mti_env, dt_obj, attr,
375 arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
378 lu_object_get(&dt_obj->do_lu);
379 arg->object = dt_obj;
380 arg->u.attr_set.attr = *attr;
386 static int out_attr_set(struct mdt_thread_info *info)
388 struct update *update = info->mti_u.update.mti_update;
389 struct lu_attr *attr = &info->mti_attr.ma_attr;
390 struct dt_object *obj = info->mti_u.update.mti_dt_object;
391 struct obdo *lobdo = &info->mti_u.update.mti_obdo;
398 wobdo = update_param_buf(update, 0, &size);
399 if (wobdo == NULL || size != sizeof(*wobdo)) {
400 CERROR("%s: empty obdo in the update: rc = %d\n",
401 mdt_obd_name(info->mti_mdt), -EPROTO);
402 RETURN(err_serious(-EPROTO));
407 obdo_le_to_cpu(wobdo, wobdo);
408 lustre_get_wire_obdo(lobdo, wobdo);
409 la_from_obdo(attr, lobdo, lobdo->o_valid);
411 rc = out_tx_attr_set(info, obj, attr, &info->mti_handle,
412 info->mti_u.update.mti_update_reply,
413 info->mti_u.update.mti_update_reply_index);
418 static int out_attr_get(struct mdt_thread_info *info)
420 struct obdo *obdo = &info->mti_u.update.mti_obdo;
421 const struct lu_env *env = info->mti_env;
422 struct lu_attr *la = &info->mti_attr.ma_attr;
423 struct dt_object *obj = info->mti_u.update.mti_dt_object;
428 if (!lu_object_exists(&obj->do_lu))
431 dt_read_lock(env, obj, MOR_TGT_CHILD);
432 rc = dt_attr_get(env, obj, la, NULL);
434 GOTO(out_unlock, rc);
436 * If it is a directory, we will also check whether the
437 * directory is empty.
438 * la_flags = 0 : Empty.
442 if (S_ISDIR(la->la_mode)) {
444 const struct dt_it_ops *iops;
446 if (!dt_try_as_dir(env, obj))
447 GOTO(out_unlock, rc = -ENOTDIR);
449 iops = &obj->do_index_ops->dio_it;
450 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
453 result = iops->get(env, it, (const void *)"");
456 for (result = 0, i = 0; result == 0 && i < 3;
458 result = iops->next(env, it);
461 } else if (result == 0)
463 * Huh? Index contains no zero key?
473 obdo_from_la(obdo, la, la->la_valid);
474 obdo_cpu_to_le(obdo, obdo);
475 lustre_set_wire_obdo(obdo, obdo);
478 dt_read_unlock(env, obj);
479 update_insert_reply(info->mti_u.update.mti_update_reply, obdo,
480 sizeof(*obdo), 0, rc);
484 static int out_xattr_get(struct mdt_thread_info *info)
486 struct update *update = info->mti_u.update.mti_update;
487 const struct lu_env *env = info->mti_env;
488 struct lu_buf *lbuf = &info->mti_buf;
489 struct update_reply *reply = info->mti_u.update.mti_update_reply;
490 struct dt_object *obj = info->mti_u.update.mti_dt_object;
497 name = (char *)update_param_buf(update, 0, NULL);
499 CERROR("%s: empty name for xattr get: rc = %d\n",
500 mdt_obd_name(info->mti_mdt), -EPROTO);
501 RETURN(err_serious(-EPROTO));
504 ptr = update_get_buf_internal(reply, 0, NULL);
505 LASSERT(ptr != NULL);
507 /* The first 4 bytes(int) are used to store the result */
508 lbuf->lb_buf = (char *)ptr + sizeof(int);
509 lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
510 dt_read_lock(env, obj, MOR_TGT_CHILD);
511 rc = dt_xattr_get(env, obj, lbuf, name, NULL);
512 dt_read_unlock(env, obj);
519 GOTO(out, rc = -ENOENT);
523 CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
524 mdt_obd_name(info->mti_mdt), PFID(lu_object_fid(&obj->do_lu)),
525 name, (int)lbuf->lb_len);
528 reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
532 static int out_index_lookup(struct mdt_thread_info *info)
534 struct update *update = info->mti_u.update.mti_update;
535 const struct lu_env *env = info->mti_env;
536 struct dt_object *obj = info->mti_u.update.mti_dt_object;
542 if (!lu_object_exists(&obj->do_lu))
545 name = (char *)update_param_buf(update, 0, NULL);
547 CERROR("%s: empty name for lookup: rc = %d\n",
548 mdt_obd_name(info->mti_mdt), -EPROTO);
549 RETURN(err_serious(-EPROTO));
552 dt_read_lock(env, obj, MOR_TGT_CHILD);
553 if (!dt_try_as_dir(env, obj))
554 GOTO(out_unlock, rc = -ENOTDIR);
556 rc = dt_lookup(env, obj, (struct dt_rec *)&info->mti_tmp_fid1,
557 (struct dt_key *)name, NULL);
560 GOTO(out_unlock, rc);
565 CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
566 PFID(lu_object_fid(&obj->do_lu)), name,
567 PFID(&info->mti_tmp_fid1), rc);
568 fid_cpu_to_le(&info->mti_tmp_fid1, &info->mti_tmp_fid1);
571 dt_read_unlock(env, obj);
572 update_insert_reply(info->mti_u.update.mti_update_reply,
573 &info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1),
578 static int out_tx_xattr_set_exec(struct mdt_thread_info *info,
582 struct dt_object *dt_obj = arg->object;
585 LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
587 CDEBUG(D_OTHER, "attr set "DFID"\n",
588 PFID(lu_object_fid(&arg->object->do_lu)));
590 dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
591 rc = dt_xattr_set(info->mti_env, dt_obj, &arg->u.xattr_set.buf,
592 arg->u.xattr_set.name, arg->u.xattr_set.flags,
594 dt_write_unlock(info->mti_env, dt_obj);
596 * Ignore errors if this is LINK EA
598 if (unlikely(rc && !strncmp(arg->u.xattr_set.name, XATTR_NAME_LINK,
599 strlen(XATTR_NAME_LINK))))
602 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
604 CDEBUG(D_INFO, "set xattr buf %p name %s flag %d\n",
605 arg->u.xattr_set.buf.lb_buf, arg->u.xattr_set.name,
606 arg->u.xattr_set.flags);
611 static int __out_tx_xattr_set(struct mdt_thread_info *info,
612 struct dt_object *dt_obj,
613 const struct lu_buf *buf,
614 const char *name, int flags,
615 struct thandle_exec_args *th,
616 struct update_reply *reply, int index,
617 char *file, int line)
621 LASSERT(th->ta_handle != NULL);
622 th->ta_err = dt_declare_xattr_set(info->mti_env, dt_obj, buf, name,
623 flags, th->ta_handle);
627 arg = tx_add_exec(th, out_tx_xattr_set_exec, NULL, file, line);
629 lu_object_get(&dt_obj->do_lu);
630 arg->object = dt_obj;
631 arg->u.xattr_set.name = name;
632 arg->u.xattr_set.flags = flags;
633 arg->u.xattr_set.buf = *buf;
636 arg->u.xattr_set.csum = 0;
640 static int out_xattr_set(struct mdt_thread_info *info)
642 struct update *update = info->mti_u.update.mti_update;
643 struct dt_object *obj = info->mti_u.update.mti_dt_object;
644 struct lu_buf *lbuf = &info->mti_buf;
653 name = update_param_buf(update, 0, NULL);
655 CERROR("%s: empty name for xattr set: rc = %d\n",
656 mdt_obd_name(info->mti_mdt), -EPROTO);
657 RETURN(err_serious(-EPROTO));
660 buf = (char *)update_param_buf(update, 1, &buf_len);
661 if (buf == NULL || buf_len == 0) {
662 CERROR("%s: empty buf for xattr set: rc = %d\n",
663 mdt_obd_name(info->mti_mdt), -EPROTO);
664 RETURN(err_serious(-EPROTO));
668 lbuf->lb_len = buf_len;
670 tmp = (char *)update_param_buf(update, 2, NULL);
672 CERROR("%s: empty flag for xattr set: rc = %d\n",
673 mdt_obd_name(info->mti_mdt), -EPROTO);
674 RETURN(err_serious(-EPROTO));
677 flag = le32_to_cpu(*(int *)tmp);
679 rc = out_tx_xattr_set(info, obj, lbuf, name, flag, &info->mti_handle,
680 info->mti_u.update.mti_update_reply,
681 info->mti_u.update.mti_update_reply_index);
686 static int out_tx_ref_add_exec(struct mdt_thread_info *info, struct thandle *th,
689 struct dt_object *dt_obj = arg->object;
691 LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
693 CDEBUG(D_OTHER, "ref add "DFID"\n",
694 PFID(lu_object_fid(&arg->object->do_lu)));
696 dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
697 dt_ref_add(info->mti_env, dt_obj, th);
698 dt_write_unlock(info->mti_env, dt_obj);
700 update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
704 static int out_tx_ref_add_undo(struct mdt_thread_info *info, struct thandle *th,
707 struct dt_object *dt_obj = arg->object;
709 LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
711 CDEBUG(D_OTHER, "ref del "DFID"\n",
712 PFID(lu_object_fid(&arg->object->do_lu)));
714 dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
715 dt_ref_del(info->mti_env, dt_obj, th);
716 dt_write_unlock(info->mti_env, dt_obj);
721 static int __out_tx_ref_add(struct mdt_thread_info *info,
722 struct dt_object *dt_obj,
723 struct thandle_exec_args *th,
724 struct update_reply *reply,
725 int index, char *file, int line)
729 LASSERT(th->ta_handle != NULL);
730 th->ta_err = dt_declare_ref_add(info->mti_env, dt_obj,
735 arg = tx_add_exec(th, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
738 lu_object_get(&dt_obj->do_lu);
739 arg->object = dt_obj;
746 * increase ref of the object
748 static int out_ref_add(struct mdt_thread_info *info)
750 struct dt_object *obj = info->mti_u.update.mti_dt_object;
755 rc = out_tx_ref_add(info, obj, &info->mti_handle,
756 info->mti_u.update.mti_update_reply,
757 info->mti_u.update.mti_update_reply_index);
761 static int out_tx_ref_del_exec(struct mdt_thread_info *info, struct thandle *th,
764 out_tx_ref_add_undo(info, th, arg);
765 update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
769 static int out_tx_ref_del_undo(struct mdt_thread_info *info, struct thandle *th,
772 return out_tx_ref_add_exec(info, th, arg);
775 static int __out_tx_ref_del(struct mdt_thread_info *info,
776 struct dt_object *dt_obj,
777 struct thandle_exec_args *th,
778 struct update_reply *reply,
779 int index, char *file, int line)
783 LASSERT(th->ta_handle != NULL);
784 th->ta_err = dt_declare_ref_del(info->mti_env, dt_obj, th->ta_handle);
788 arg = tx_add_exec(th, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
791 lu_object_get(&dt_obj->do_lu);
792 arg->object = dt_obj;
798 static int out_ref_del(struct mdt_thread_info *info)
800 struct dt_object *obj = info->mti_u.update.mti_dt_object;
805 if (!lu_object_exists(&obj->do_lu))
808 rc = out_tx_ref_del(info, obj, &info->mti_handle,
809 info->mti_u.update.mti_update_reply,
810 info->mti_u.update.mti_update_reply_index);
814 static int out_tx_index_insert_exec(struct mdt_thread_info *info,
815 struct thandle *th, struct tx_arg *arg)
817 struct dt_object *dt_obj = arg->object;
820 LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
822 CDEBUG(D_OTHER, "index insert "DFID" name: %s fid "DFID"\n",
823 PFID(lu_object_fid(&arg->object->do_lu)),
824 (char *)arg->u.insert.key,
825 PFID((struct lu_fid *)arg->u.insert.rec));
827 if (dt_try_as_dir(info->mti_env, dt_obj) == 0)
830 dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
831 rc = dt_insert(info->mti_env, dt_obj, arg->u.insert.rec,
832 arg->u.insert.key, th, NULL, 0);
833 dt_write_unlock(info->mti_env, dt_obj);
835 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
840 static int out_tx_index_insert_undo(struct mdt_thread_info *info,
841 struct thandle *th, struct tx_arg *arg)
843 struct dt_object *dt_obj = arg->object;
846 LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
848 CDEBUG(D_OTHER, "index delete "DFID" name: %s\n",
849 PFID(lu_object_fid(&arg->object->do_lu)),
850 (char *)arg->u.insert.key);
852 if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
853 CERROR("%s: "DFID" is not directory: rc = %d\n",
854 mdt_obd_name(info->mti_mdt),
855 PFID(lu_object_fid(&dt_obj->do_lu)), -ENOTDIR);
859 dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
860 rc = dt_delete(info->mti_env, dt_obj, arg->u.insert.key, th, NULL);
861 dt_write_unlock(info->mti_env, dt_obj);
863 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
868 static int __out_tx_index_insert(struct mdt_thread_info *info,
869 struct dt_object *dt_obj,
870 char *name, struct lu_fid *fid,
871 struct thandle_exec_args *th,
872 struct update_reply *reply,
873 int index, char *file, int line)
877 LASSERT(th->ta_handle != NULL);
879 if (lu_object_exists(&dt_obj->do_lu)) {
880 if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
881 th->ta_err = -ENOTDIR;
884 th->ta_err = dt_declare_insert(info->mti_env, dt_obj,
885 (struct dt_rec *)fid,
886 (struct dt_key *)name,
893 arg = tx_add_exec(th, out_tx_index_insert_exec,
894 out_tx_index_insert_undo, file,
897 lu_object_get(&dt_obj->do_lu);
898 arg->object = dt_obj;
901 arg->u.insert.rec = (struct dt_rec *)fid;
902 arg->u.insert.key = (struct dt_key *)name;
907 static int out_index_insert(struct mdt_thread_info *info)
909 struct update *update = info->mti_u.update.mti_update;
910 struct dt_object *obj = info->mti_u.update.mti_dt_object;
917 name = (char *)update_param_buf(update, 0, NULL);
919 CERROR("%s: empty name for index insert: rc = %d\n",
920 mdt_obd_name(info->mti_mdt), -EPROTO);
921 RETURN(err_serious(-EPROTO));
924 fid = (struct lu_fid *)update_param_buf(update, 1, &size);
925 if (fid == NULL || size != sizeof(*fid)) {
926 CERROR("%s: invalid fid: rc = %d\n",
927 mdt_obd_name(info->mti_mdt), -EPROTO);
928 RETURN(err_serious(-EPROTO));
931 fid_le_to_cpu(fid, fid);
932 if (!fid_is_sane(fid)) {
933 CERROR("%s: invalid FID "DFID": rc = %d\n",
934 mdt_obd_name(info->mti_mdt), PFID(fid),
936 RETURN(err_serious(-EPROTO));
939 rc = out_tx_index_insert(info, obj, name, fid, &info->mti_handle,
940 info->mti_u.update.mti_update_reply,
941 info->mti_u.update.mti_update_reply_index);
945 static int out_tx_index_delete_exec(struct mdt_thread_info *info,
949 return out_tx_index_insert_undo(info, th, arg);
952 static int out_tx_index_delete_undo(struct mdt_thread_info *info,
956 return out_tx_index_insert_exec(info, th, arg);
959 static int __out_tx_index_delete(struct mdt_thread_info *info,
960 struct dt_object *dt_obj, char *name,
961 struct thandle_exec_args *th,
962 struct update_reply *reply,
963 int index, char *file, int line)
967 if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
968 th->ta_err = -ENOTDIR;
972 LASSERT(th->ta_handle != NULL);
973 th->ta_err = dt_declare_delete(info->mti_env, dt_obj,
974 (struct dt_key *)name,
979 arg = tx_add_exec(th, out_tx_index_delete_exec,
980 out_tx_index_delete_undo, file,
983 lu_object_get(&dt_obj->do_lu);
984 arg->object = dt_obj;
987 arg->u.insert.key = (struct dt_key *)name;
991 static int out_index_delete(struct mdt_thread_info *info)
993 struct update *update = info->mti_u.update.mti_update;
994 struct dt_object *obj = info->mti_u.update.mti_dt_object;
998 if (!lu_object_exists(&obj->do_lu))
1000 name = (char *)update_param_buf(update, 0, NULL);
1002 CERROR("%s: empty name for index delete: rc = %d\n",
1003 mdt_obd_name(info->mti_mdt), -EPROTO);
1004 RETURN(err_serious(-EPROTO));
1007 rc = out_tx_index_delete(info, obj, name, &info->mti_handle,
1008 info->mti_u.update.mti_update_reply,
1009 info->mti_u.update.mti_update_reply_index);
1013 #define DEF_OUT_HNDL(opc, name, fail_id, flags, fn) \
1014 [opc - OBJ_CREATE] = { \
1016 .mh_fail_id = fail_id, \
1018 .mh_flags = flags, \
1023 #define out_handler mdt_handler
1024 static struct out_handler out_update_ops[] = {
1025 DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO,
1027 DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO,
1029 DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO,
1031 DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", 0, MUTABOR | HABEO_REFERO,
1033 DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", 0, HABEO_REFERO,
1035 DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", 0, MUTABOR | HABEO_REFERO,
1037 DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO,
1039 DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0,
1040 HABEO_REFERO, out_index_lookup),
1041 DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0,
1042 MUTABOR | HABEO_REFERO, out_index_insert),
1043 DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0,
1044 MUTABOR | HABEO_REFERO, out_index_delete),
1047 #define out_opc_slice mdt_opc_slice
1048 static struct out_opc_slice out_handlers[] = {
1050 .mos_opc_start = OBJ_CREATE,
1051 .mos_opc_end = OBJ_LAST,
1052 .mos_hs = out_update_ops
1057 * Object updates between Targets. Because all the updates has been
1058 * dis-assemblied into object updates in master MDD layer, so out
1059 * will skip MDD layer, and call OSD API directly to execute these
1062 * In phase I, all of the updates in the request need to be executed
1063 * in one transaction, and the transaction has to be synchronously.
1065 * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1068 int out_handle(struct mdt_thread_info *info)
1070 struct req_capsule *pill = info->mti_pill;
1071 struct update_buf *ubuf;
1072 struct update *update;
1073 struct thandle_exec_args *th = &info->mti_handle;
1082 req_capsule_set(pill, &RQF_UPDATE_OBJ);
1083 bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1084 if (bufsize != UPDATE_BUFFER_SIZE) {
1085 CERROR("%s: invalid bufsize %d: rc = %d\n",
1086 mdt_obd_name(info->mti_mdt), bufsize, -EPROTO);
1087 RETURN(err_serious(-EPROTO));
1090 ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1092 CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(info->mti_mdt),
1094 RETURN(err_serious(-EPROTO));
1097 if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
1098 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1099 mdt_obd_name(info->mti_mdt), le32_to_cpu(ubuf->ub_magic),
1100 UPDATE_BUFFER_MAGIC, -EPROTO);
1101 RETURN(err_serious(-EPROTO));
1104 count = le32_to_cpu(ubuf->ub_count);
1106 CERROR("%s: No update!: rc = %d\n",
1107 mdt_obd_name(info->mti_mdt), -EPROTO);
1108 RETURN(err_serious(-EPROTO));
1111 req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1112 UPDATE_BUFFER_SIZE);
1113 rc = req_capsule_server_pack(pill);
1115 CERROR("%s: Can't pack response: rc = %d\n",
1116 mdt_obd_name(info->mti_mdt), rc);
1120 /* Prepare the update reply buffer */
1121 info->mti_u.update.mti_update_reply =
1122 req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1123 update_init_reply_buf(info->mti_u.update.mti_update_reply, count);
1125 rc = out_tx_start(info->mti_env, info->mti_mdt, th);
1129 /* Walk through updates in the request to execute them synchronously */
1130 off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1131 for (i = 0; i < count; i++) {
1132 struct out_handler *h;
1133 struct dt_object *dt_obj;
1135 update = (struct update *)((char *)ubuf + off);
1137 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1138 if (!fid_is_sane(&update->u_fid)) {
1139 CERROR("%s: invalid FID "DFID": rc = %d\n",
1140 mdt_obd_name(info->mti_mdt),
1141 PFID(&update->u_fid), -EPROTO);
1142 GOTO(out, rc = err_serious(-EPROTO));
1144 dt_obj = out_object_find(info, &update->u_fid);
1146 GOTO(out, rc = PTR_ERR(dt_obj));
1148 info->mti_u.update.mti_dt_object = dt_obj;
1149 info->mti_u.update.mti_update = update;
1150 info->mti_u.update.mti_update_reply_index = i;
1152 h = mdt_handler_find(update->u_type, out_handlers);
1153 if (likely(h != NULL)) {
1154 rc = h->mh_act(info);
1156 CERROR("%s: The unsupported opc: 0x%x\n",
1157 mdt_obd_name(info->mti_mdt), update->u_type);
1158 lu_object_put(info->mti_env, &dt_obj->do_lu);
1159 GOTO(out, rc = -ENOTSUPP);
1161 lu_object_put(info->mti_env, &dt_obj->do_lu);
1164 off += cfs_size_round(update_size(update));
1168 rc1 = out_tx_end(info, th);
1169 rc = rc == 0 ? rc1 : rc;
1170 info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET;