Whamcloud - gitweb
LU-1866 lfsck: enhance otable-based iteration
[fs/lustre-release.git] / lustre / mdt / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  * Lustre is a trademark of Sun Microsystems, Inc.
28  *
29  * lustre/mdt/out_handler.c
30  *
31  * Object update handler between targets.
32  *
33  * Author: di.wang <di.wang@intel.com>
34  */
35
36 #ifndef EXPORT_SYMTAB
37 # define EXPORT_SYMTAB
38 #endif
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include "mdt_internal.h"
42 #include <lustre_update.h>
43
44 static const char dot[] = ".";
45 static const char dotdot[] = "..";
46
47 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
48                            tx_exec_func_t undo, char *file, int line)
49 {
50         int i;
51
52         LASSERT(ta);
53         LASSERT(func);
54
55         i = ta->ta_argno;
56         LASSERT(i < UPDATE_MAX_OPS);
57
58         ta->ta_argno++;
59
60         ta->ta_args[i].exec_fn = func;
61         ta->ta_args[i].undo_fn = undo;
62         ta->ta_args[i].file    = file;
63         ta->ta_args[i].line    = line;
64
65         return &ta->ta_args[i];
66 }
67
68 static int out_tx_start(const struct lu_env *env, struct mdt_device *mdt,
69                         struct thandle_exec_args *th)
70 {
71         struct dt_device *dt = mdt->mdt_bottom;
72
73         memset(th, 0, sizeof(*th));
74         th->ta_handle = dt_trans_create(env, dt);
75         if (IS_ERR(th->ta_handle)) {
76                 CERROR("%s: start handle error: rc = %ld\n",
77                        mdt2obd_dev(mdt)->obd_name, PTR_ERR(th->ta_handle));
78                 return PTR_ERR(th->ta_handle);
79         }
80         th->ta_dev = dt;
81         /*For phase I, sync for cross-ref operation*/
82         th->ta_handle->th_sync = 1;
83         return 0;
84 }
85
86 static int out_trans_start(const struct lu_env *env, struct dt_device *dt,
87                            struct thandle *th)
88 {
89         /* Always do sync commit for Phase I */
90         LASSERT(th->th_sync != 0);
91         return dt_trans_start(env, dt, th);
92 }
93
94 static int out_trans_stop(const struct lu_env *env,
95                           struct thandle_exec_args *th, int err)
96 {
97         int i;
98         int rc;
99
100         th->ta_handle->th_result = err;
101         LASSERT(th->ta_handle->th_sync != 0);
102         rc = dt_trans_stop(env, th->ta_dev, th->ta_handle);
103         for (i = 0; i < th->ta_argno; i++) {
104                 if (th->ta_args[i].object != NULL) {
105                         lu_object_put(env, &th->ta_args[i].object->do_lu);
106                         th->ta_args[i].object = NULL;
107                 }
108         }
109
110         return rc;
111 }
112
113 int out_tx_end(struct mdt_thread_info *info, struct thandle_exec_args *th)
114 {
115         struct thandle_exec_args *_th = &info->mti_handle;
116         int i = 0, rc;
117
118         LASSERT(th == _th);
119         LASSERT(th->ta_dev);
120         LASSERT(th->ta_handle);
121
122         if (th->ta_err != 0 || th->ta_argno == 0)
123                 GOTO(stop, rc = th->ta_err);
124
125         rc = out_trans_start(info->mti_env, th->ta_dev, th->ta_handle);
126         if (unlikely(rc))
127                 GOTO(stop, rc);
128
129         for (i = 0; i < th->ta_argno; i++) {
130                 rc = th->ta_args[i].exec_fn(info, th->ta_handle,
131                                             &th->ta_args[i]);
132                 if (unlikely(rc)) {
133                         CDEBUG(D_INFO, "error during execution of #%u from"
134                                " %s:%d: rc = %d\n", i, th->ta_args[i].file,
135                                th->ta_args[i].line, rc);
136                         while (--i >= 0) {
137                                 LASSERTF(th->ta_args[i].undo_fn != NULL,
138                                     "can't undo changes, hope for failover!\n");
139                                 th->ta_args[i].undo_fn(info, th->ta_handle,
140                                                        &th->ta_args[i]);
141                         }
142                         break;
143                 }
144         }
145 stop:
146         CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
147                mdt_obd_name(info->mti_mdt), i, th->ta_argno, rc);
148         out_trans_stop(info->mti_env, th, rc);
149         th->ta_handle = NULL;
150         th->ta_argno = 0;
151         th->ta_err = 0;
152
153         RETURN(rc);
154 }
155
156 static struct dt_object *out_get_dt_obj(struct lu_object *obj)
157 {
158         struct mdt_device *mdt;
159         struct dt_device *dt;
160         struct lu_object *bottom_obj;
161
162         mdt = lu2mdt_dev(obj->lo_dev);
163         dt = mdt->mdt_bottom;
164
165         bottom_obj = lu_object_locate(obj->lo_header, dt->dd_lu_dev.ld_type);
166         if (bottom_obj == NULL)
167                 return ERR_PTR(-ENOENT);
168
169         return lu2dt_obj(bottom_obj);
170 }
171
172 static struct dt_object *out_object_find(struct mdt_thread_info *info,
173                                          struct lu_fid *fid)
174 {
175         struct lu_object        *obj;
176         struct dt_object        *dt_obj;
177
178         obj = lu_object_find(info->mti_env,
179                              &info->mti_mdt->mdt_md_dev.md_lu_dev,
180                              fid, NULL);
181         if (IS_ERR(obj))
182                 return (struct dt_object *)obj;
183
184         dt_obj = out_get_dt_obj(obj);
185         if (IS_ERR(dt_obj)) {
186                 lu_object_put(info->mti_env, obj);
187                 return ERR_PTR(-ENOENT);
188         }
189
190         return dt_obj;
191 }
192
193 /**
194  * All of the xxx_undo will be used once execution failed,
195  * But because all of the required resource has been reserved in
196  * declare phase, i.e. if declare succeed, it should make sure
197  * the following executing phase succeed in anyway, so these undo
198  * should be useless for most of the time in Phase I
199  **/
200 int out_tx_create_undo(struct mdt_thread_info *info, struct thandle *th,
201                        struct tx_arg *arg)
202 {
203         struct dt_object *dt_obj = arg->object;
204         int rc;
205
206         LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
207
208         dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
209         rc = dt_destroy(info->mti_env, dt_obj, th);
210         dt_write_unlock(info->mti_env, dt_obj);
211
212         /* we don't like double failures */
213         if (rc != 0)
214                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
215                        mdt_obd_name(info->mti_mdt), rc);
216         return rc;
217 }
218
219 int out_tx_create_exec(struct mdt_thread_info *info, struct thandle *th,
220                        struct tx_arg *arg)
221 {
222         struct dt_object *dt_obj = arg->object;
223         int rc;
224
225         LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
226
227         CDEBUG(D_OTHER, "create "DFID": dof %u, mode %o\n",
228                PFID(lu_object_fid(&arg->object->do_lu)),
229                arg->u.create.dof.dof_type,
230                arg->u.create.attr.la_mode & S_IFMT);
231
232         dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
233         rc = dt_create(info->mti_env, dt_obj, &arg->u.create.attr,
234                        &arg->u.create.hint, &arg->u.create.dof, th);
235
236         dt_write_unlock(info->mti_env, dt_obj);
237         CDEBUG(D_INFO, "insert create reply mode %o index %d\n",
238                arg->u.create.attr.la_mode, arg->index);
239
240         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
241
242         return rc;
243 }
244
245 static int __out_tx_create(struct mdt_thread_info *info, struct dt_object *obj,
246                            struct lu_attr *attr, struct lu_fid *parent_fid,
247                            struct dt_object_format *dof,
248                            struct thandle_exec_args *th,
249                            struct update_reply *reply,
250                            int index, char *file, int line)
251 {
252         struct tx_arg *arg;
253
254         LASSERT(th->ta_handle != NULL);
255         th->ta_err = dt_declare_create(info->mti_env, obj, attr, NULL, dof,
256                                        th->ta_handle);
257         if (th->ta_err != 0)
258                 return th->ta_err;
259
260         arg = tx_add_exec(th, out_tx_create_exec, out_tx_create_undo, file,
261                           line);
262         LASSERT(arg);
263
264         /* release the object in out_trans_stop */
265         lu_object_get(&obj->do_lu);
266         arg->object = obj;
267         arg->u.create.attr = *attr;
268         if (parent_fid)
269                 arg->u.create.fid = *parent_fid;
270         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
271         arg->u.create.dof  = *dof;
272         arg->reply = reply;
273         arg->index = index;
274
275         return 0;
276 }
277
278 static int out_create(struct mdt_thread_info *info)
279 {
280         struct update           *update = info->mti_u.update.mti_update;
281         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
282         struct dt_object_format *dof = &info->mti_u.update.mti_update_dof;
283         struct obdo             *lobdo = &info->mti_u.update.mti_obdo;
284         struct lu_attr          *attr = &info->mti_attr.ma_attr;
285         struct lu_fid           *fid = NULL;
286         struct obdo             *wobdo;
287         int                     size;
288         int                     rc;
289
290         ENTRY;
291
292         wobdo = update_param_buf(update, 0, &size);
293         if (wobdo == NULL || size != sizeof(*wobdo)) {
294                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
295                        mdt_obd_name(info->mti_mdt), -EPROTO);
296                 RETURN(err_serious(-EPROTO));
297         }
298
299         obdo_le_to_cpu(wobdo, wobdo);
300         lustre_get_wire_obdo(lobdo, wobdo);
301         la_from_obdo(attr, lobdo, lobdo->o_valid);
302
303         dof->dof_type = dt_mode_to_dft(attr->la_mode);
304         if (S_ISDIR(attr->la_mode)) {
305                 int size;
306
307                 fid = update_param_buf(update, 1, &size);
308                 if (fid == NULL || size != sizeof(*fid)) {
309                         CERROR("%s: invalid fid: rc = %d\n",
310                                mdt_obd_name(info->mti_mdt), -EPROTO);
311                         RETURN(err_serious(-EPROTO));
312                 }
313                 fid_le_to_cpu(fid, fid);
314                 if (!fid_is_sane(fid)) {
315                         CERROR("%s: invalid fid "DFID": rc = %d\n",
316                                mdt_obd_name(info->mti_mdt),
317                                PFID(fid), -EPROTO);
318                         RETURN(err_serious(-EPROTO));
319                 }
320         }
321
322         rc = out_tx_create(info, obj, attr, fid, dof, &info->mti_handle,
323                            info->mti_u.update.mti_update_reply,
324                            info->mti_u.update.mti_update_reply_index);
325
326         RETURN(rc);
327 }
328
329 static int out_tx_attr_set_undo(struct mdt_thread_info *info,
330                                 struct thandle *th, struct tx_arg *arg)
331 {
332         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
333                mdt_obd_name(info->mti_mdt),
334                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
335
336         return -ENOTSUPP;
337 }
338
339 static int out_tx_attr_set_exec(struct mdt_thread_info *info,
340                                 struct thandle *th, struct tx_arg *arg)
341 {
342         struct dt_object        *dt_obj = arg->object;
343         int                     rc;
344
345         LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
346
347         CDEBUG(D_OTHER, "attr set "DFID"\n",
348                PFID(lu_object_fid(&arg->object->do_lu)));
349
350         dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
351         rc = dt_attr_set(info->mti_env, dt_obj, &arg->u.attr_set.attr,
352                          th, NULL);
353         dt_write_unlock(info->mti_env, dt_obj);
354
355         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
356
357         return rc;
358 }
359
360 static int __out_tx_attr_set(struct mdt_thread_info *info,
361                              struct dt_object *dt_obj,
362                              const struct lu_attr *attr,
363                              struct thandle_exec_args *th,
364                              struct update_reply *reply, int index,
365                              char *file, int line)
366 {
367         struct tx_arg           *arg;
368
369         LASSERT(th->ta_handle != NULL);
370         th->ta_err = dt_declare_attr_set(info->mti_env, dt_obj, attr,
371                                          th->ta_handle);
372         if (th->ta_err != 0)
373                 return th->ta_err;
374
375         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
376                           file, line);
377         LASSERT(arg);
378         lu_object_get(&dt_obj->do_lu);
379         arg->object = dt_obj;
380         arg->u.attr_set.attr = *attr;
381         arg->reply = reply;
382         arg->index = index;
383         return 0;
384 }
385
386 static int out_attr_set(struct mdt_thread_info *info)
387 {
388         struct update           *update = info->mti_u.update.mti_update;
389         struct lu_attr          *attr = &info->mti_attr.ma_attr;
390         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
391         struct obdo             *lobdo = &info->mti_u.update.mti_obdo;
392         struct obdo             *wobdo;
393         int                     size;
394         int                     rc;
395
396         ENTRY;
397
398         wobdo = update_param_buf(update, 0, &size);
399         if (wobdo == NULL || size != sizeof(*wobdo)) {
400                 CERROR("%s: empty obdo in the update: rc = %d\n",
401                        mdt_obd_name(info->mti_mdt), -EPROTO);
402                 RETURN(err_serious(-EPROTO));
403         }
404
405         attr->la_valid = 0;
406         attr->la_valid = 0;
407         obdo_le_to_cpu(wobdo, wobdo);
408         lustre_get_wire_obdo(lobdo, wobdo);
409         la_from_obdo(attr, lobdo, lobdo->o_valid);
410
411         rc = out_tx_attr_set(info, obj, attr, &info->mti_handle,
412                              info->mti_u.update.mti_update_reply,
413                              info->mti_u.update.mti_update_reply_index);
414
415         RETURN(rc);
416 }
417
418 static int out_attr_get(struct mdt_thread_info *info)
419 {
420         struct obdo             *obdo = &info->mti_u.update.mti_obdo;
421         const struct lu_env     *env = info->mti_env;
422         struct lu_attr          *la = &info->mti_attr.ma_attr;
423         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
424         int                     rc;
425
426         ENTRY;
427
428         if (!lu_object_exists(&obj->do_lu))
429                 RETURN(-ENOENT);
430
431         dt_read_lock(env, obj, MOR_TGT_CHILD);
432         rc = dt_attr_get(env, obj, la, NULL);
433         if (rc)
434                 GOTO(out_unlock, rc);
435         /*
436          * If it is a directory, we will also check whether the
437          * directory is empty.
438          * la_flags = 0 : Empty.
439          *          = 1 : Not empty.
440          */
441         la->la_flags = 0;
442         if (S_ISDIR(la->la_mode)) {
443                 struct dt_it     *it;
444                 const struct dt_it_ops *iops;
445
446                 if (!dt_try_as_dir(env, obj))
447                         GOTO(out_unlock, rc = -ENOTDIR);
448
449                 iops = &obj->do_index_ops->dio_it;
450                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
451                 if (!IS_ERR(it)) {
452                         int  result;
453                         result = iops->get(env, it, (const void *)"");
454                         if (result > 0) {
455                                 int i;
456                                 for (result = 0, i = 0; result == 0 && i < 3;
457                                      ++i)
458                                         result = iops->next(env, it);
459                                 if (result == 0)
460                                         la->la_flags = 1;
461                         } else if (result == 0)
462                                 /*
463                                  * Huh? Index contains no zero key?
464                                  */
465                                 rc = -EIO;
466
467                         iops->put(env, it);
468                         iops->fini(env, it);
469                 }
470         }
471
472         obdo->o_valid = 0;
473         obdo_from_la(obdo, la, la->la_valid);
474         obdo_cpu_to_le(obdo, obdo);
475         lustre_set_wire_obdo(obdo, obdo);
476
477 out_unlock:
478         dt_read_unlock(env, obj);
479         update_insert_reply(info->mti_u.update.mti_update_reply, obdo,
480                             sizeof(*obdo), 0, rc);
481         RETURN(rc);
482 }
483
484 static int out_xattr_get(struct mdt_thread_info *info)
485 {
486         struct update           *update = info->mti_u.update.mti_update;
487         const struct lu_env     *env = info->mti_env;
488         struct lu_buf           *lbuf = &info->mti_buf;
489         struct update_reply     *reply = info->mti_u.update.mti_update_reply;
490         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
491         char                    *name;
492         void                    *ptr;
493         int                     rc;
494
495         ENTRY;
496
497         name = (char *)update_param_buf(update, 0, NULL);
498         if (name == NULL) {
499                 CERROR("%s: empty name for xattr get: rc = %d\n",
500                        mdt_obd_name(info->mti_mdt), -EPROTO);
501                 RETURN(err_serious(-EPROTO));
502         }
503
504         ptr = update_get_buf_internal(reply, 0, NULL);
505         LASSERT(ptr != NULL);
506
507         /* The first 4 bytes(int) are used to store the result */
508         lbuf->lb_buf = (char *)ptr + sizeof(int);
509         lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
510         dt_read_lock(env, obj, MOR_TGT_CHILD);
511         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
512         dt_read_unlock(env, obj);
513         if (rc < 0) {
514                 lbuf->lb_len = 0;
515                 GOTO(out, rc);
516         }
517         if (rc == 0) {
518                 lbuf->lb_len = 0;
519                 GOTO(out, rc = -ENOENT);
520         }
521         lbuf->lb_len = rc;
522         rc = 0;
523         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
524                mdt_obd_name(info->mti_mdt), PFID(lu_object_fid(&obj->do_lu)),
525                name, (int)lbuf->lb_len);
526 out:
527         *(int *)ptr = rc;
528         reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
529         RETURN(rc);
530 }
531
532 static int out_index_lookup(struct mdt_thread_info *info)
533 {
534         struct update           *update = info->mti_u.update.mti_update;
535         const struct lu_env     *env = info->mti_env;
536         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
537         char                    *name;
538         int                     rc;
539
540         ENTRY;
541
542         if (!lu_object_exists(&obj->do_lu))
543                 RETURN(-ENOENT);
544
545         name = (char *)update_param_buf(update, 0, NULL);
546         if (name == NULL) {
547                 CERROR("%s: empty name for lookup: rc = %d\n",
548                        mdt_obd_name(info->mti_mdt), -EPROTO);
549                 RETURN(err_serious(-EPROTO));
550         }
551
552         dt_read_lock(env, obj, MOR_TGT_CHILD);
553         if (!dt_try_as_dir(env, obj))
554                 GOTO(out_unlock, rc = -ENOTDIR);
555
556         rc = dt_lookup(env, obj, (struct dt_rec *)&info->mti_tmp_fid1,
557                 (struct dt_key *)name, NULL);
558
559         if (rc < 0)
560                 GOTO(out_unlock, rc);
561
562         if (rc == 0)
563                 rc += 1;
564
565         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
566                PFID(lu_object_fid(&obj->do_lu)), name,
567                PFID(&info->mti_tmp_fid1), rc);
568         fid_cpu_to_le(&info->mti_tmp_fid1, &info->mti_tmp_fid1);
569
570 out_unlock:
571         dt_read_unlock(env, obj);
572         update_insert_reply(info->mti_u.update.mti_update_reply,
573                             &info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1),
574                             0, rc);
575         RETURN(rc);
576 }
577
578 static int out_tx_xattr_set_exec(struct mdt_thread_info *info,
579                                  struct thandle *th,
580                                  struct tx_arg *arg)
581 {
582         struct dt_object *dt_obj = arg->object;
583         int rc;
584
585         LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
586
587         CDEBUG(D_OTHER, "attr set "DFID"\n",
588                PFID(lu_object_fid(&arg->object->do_lu)));
589
590         dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
591         rc = dt_xattr_set(info->mti_env, dt_obj, &arg->u.xattr_set.buf,
592                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
593                           th, NULL);
594         dt_write_unlock(info->mti_env, dt_obj);
595         /**
596          * Ignore errors if this is LINK EA
597          **/
598         if (unlikely(rc && !strncmp(arg->u.xattr_set.name, XATTR_NAME_LINK,
599                                     strlen(XATTR_NAME_LINK))))
600                 rc = 0;
601
602         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
603
604         CDEBUG(D_INFO, "set xattr buf %p name %s flag %d\n",
605                arg->u.xattr_set.buf.lb_buf, arg->u.xattr_set.name,
606                arg->u.xattr_set.flags);
607
608         return rc;
609 }
610
611 static int __out_tx_xattr_set(struct mdt_thread_info *info,
612                               struct dt_object *dt_obj,
613                               const struct lu_buf *buf,
614                               const char *name, int flags,
615                               struct thandle_exec_args *th,
616                               struct update_reply *reply, int index,
617                               char *file, int line)
618 {
619         struct tx_arg           *arg;
620
621         LASSERT(th->ta_handle != NULL);
622         th->ta_err = dt_declare_xattr_set(info->mti_env, dt_obj, buf, name,
623                                           flags, th->ta_handle);
624         if (th->ta_err != 0)
625                 return th->ta_err;
626
627         arg = tx_add_exec(th, out_tx_xattr_set_exec, NULL, file, line);
628         LASSERT(arg);
629         lu_object_get(&dt_obj->do_lu);
630         arg->object = dt_obj;
631         arg->u.xattr_set.name = name;
632         arg->u.xattr_set.flags = flags;
633         arg->u.xattr_set.buf = *buf;
634         arg->reply = reply;
635         arg->index = index;
636         arg->u.xattr_set.csum = 0;
637         return 0;
638 }
639
640 static int out_xattr_set(struct mdt_thread_info *info)
641 {
642         struct update           *update = info->mti_u.update.mti_update;
643         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
644         struct lu_buf           *lbuf = &info->mti_buf;
645         char                    *name;
646         char                    *buf;
647         char                    *tmp;
648         int                     buf_len = 0;
649         int                     flag;
650         int                     rc;
651         ENTRY;
652
653         name = update_param_buf(update, 0, NULL);
654         if (name == NULL) {
655                 CERROR("%s: empty name for xattr set: rc = %d\n",
656                        mdt_obd_name(info->mti_mdt), -EPROTO);
657                 RETURN(err_serious(-EPROTO));
658         }
659
660         buf = (char *)update_param_buf(update, 1, &buf_len);
661         if (buf == NULL || buf_len == 0) {
662                 CERROR("%s: empty buf for xattr set: rc = %d\n",
663                        mdt_obd_name(info->mti_mdt), -EPROTO);
664                 RETURN(err_serious(-EPROTO));
665         }
666
667         lbuf->lb_buf = buf;
668         lbuf->lb_len = buf_len;
669
670         tmp = (char *)update_param_buf(update, 2, NULL);
671         if (tmp == NULL) {
672                 CERROR("%s: empty flag for xattr set: rc = %d\n",
673                        mdt_obd_name(info->mti_mdt), -EPROTO);
674                 RETURN(err_serious(-EPROTO));
675         }
676
677         flag = le32_to_cpu(*(int *)tmp);
678
679         rc = out_tx_xattr_set(info, obj, lbuf, name, flag, &info->mti_handle,
680                          info->mti_u.update.mti_update_reply,
681                          info->mti_u.update.mti_update_reply_index);
682
683         RETURN(rc);
684 }
685
686 static int out_tx_ref_add_exec(struct mdt_thread_info *info, struct thandle *th,
687                                struct tx_arg *arg)
688 {
689         struct dt_object *dt_obj = arg->object;
690
691         LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
692
693         CDEBUG(D_OTHER, "ref add "DFID"\n",
694                PFID(lu_object_fid(&arg->object->do_lu)));
695
696         dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
697         dt_ref_add(info->mti_env, dt_obj, th);
698         dt_write_unlock(info->mti_env, dt_obj);
699
700         update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
701         return 0;
702 }
703
704 static int out_tx_ref_add_undo(struct mdt_thread_info *info, struct thandle *th,
705                                struct tx_arg *arg)
706 {
707         struct dt_object *dt_obj = arg->object;
708
709         LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
710
711         CDEBUG(D_OTHER, "ref del "DFID"\n",
712                PFID(lu_object_fid(&arg->object->do_lu)));
713
714         dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
715         dt_ref_del(info->mti_env, dt_obj, th);
716         dt_write_unlock(info->mti_env, dt_obj);
717
718         return 0;
719 }
720
721 static int __out_tx_ref_add(struct mdt_thread_info *info,
722                             struct dt_object *dt_obj,
723                             struct thandle_exec_args *th,
724                             struct update_reply *reply,
725                             int index, char *file, int line)
726 {
727         struct tx_arg           *arg;
728
729         LASSERT(th->ta_handle != NULL);
730         th->ta_err = dt_declare_ref_add(info->mti_env, dt_obj,
731                                         th->ta_handle);
732         if (th->ta_err != 0)
733                 return th->ta_err;
734
735         arg = tx_add_exec(th, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
736                           line);
737         LASSERT(arg);
738         lu_object_get(&dt_obj->do_lu);
739         arg->object = dt_obj;
740         arg->reply = reply;
741         arg->index = index;
742         return 0;
743 }
744
745 /**
746  * increase ref of the object
747  **/
748 static int out_ref_add(struct mdt_thread_info *info)
749 {
750         struct dt_object  *obj = info->mti_u.update.mti_dt_object;
751         int               rc;
752
753         ENTRY;
754
755         rc = out_tx_ref_add(info, obj, &info->mti_handle,
756                             info->mti_u.update.mti_update_reply,
757                             info->mti_u.update.mti_update_reply_index);
758         RETURN(rc);
759 }
760
761 static int out_tx_ref_del_exec(struct mdt_thread_info *info, struct thandle *th,
762                                struct tx_arg *arg)
763 {
764         out_tx_ref_add_undo(info, th, arg);
765         update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
766         return 0;
767 }
768
769 static int out_tx_ref_del_undo(struct mdt_thread_info *info, struct thandle *th,
770                                struct tx_arg *arg)
771 {
772         return out_tx_ref_add_exec(info, th, arg);
773 }
774
775 static int __out_tx_ref_del(struct mdt_thread_info *info,
776                             struct dt_object *dt_obj,
777                             struct thandle_exec_args *th,
778                             struct update_reply *reply,
779                             int index, char *file, int line)
780 {
781         struct tx_arg           *arg;
782
783         LASSERT(th->ta_handle != NULL);
784         th->ta_err = dt_declare_ref_del(info->mti_env, dt_obj, th->ta_handle);
785         if (th->ta_err != 0)
786                 return th->ta_err;
787
788         arg = tx_add_exec(th, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
789                           line);
790         LASSERT(arg);
791         lu_object_get(&dt_obj->do_lu);
792         arg->object = dt_obj;
793         arg->reply = reply;
794         arg->index = index;
795         return 0;
796 }
797
798 static int out_ref_del(struct mdt_thread_info *info)
799 {
800         struct dt_object  *obj = info->mti_u.update.mti_dt_object;
801         int               rc;
802
803         ENTRY;
804
805         if (!lu_object_exists(&obj->do_lu))
806                 RETURN(-ENOENT);
807
808         rc = out_tx_ref_del(info, obj, &info->mti_handle,
809                             info->mti_u.update.mti_update_reply,
810                             info->mti_u.update.mti_update_reply_index);
811         RETURN(rc);
812 }
813
814 static int out_tx_index_insert_exec(struct mdt_thread_info *info,
815                                     struct thandle *th, struct tx_arg *arg)
816 {
817         struct dt_object *dt_obj = arg->object;
818         int rc;
819
820         LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
821
822         CDEBUG(D_OTHER, "index insert "DFID" name: %s fid "DFID"\n",
823                PFID(lu_object_fid(&arg->object->do_lu)),
824                (char *)arg->u.insert.key,
825                PFID((struct lu_fid *)arg->u.insert.rec));
826
827         if (dt_try_as_dir(info->mti_env, dt_obj) == 0)
828                 return -ENOTDIR;
829
830         dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
831         rc = dt_insert(info->mti_env, dt_obj, arg->u.insert.rec,
832                        arg->u.insert.key, th, NULL, 0);
833         dt_write_unlock(info->mti_env, dt_obj);
834
835         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
836
837         return rc;
838 }
839
840 static int out_tx_index_insert_undo(struct mdt_thread_info *info,
841                                     struct thandle *th, struct tx_arg *arg)
842 {
843         struct dt_object *dt_obj = arg->object;
844         int rc;
845
846         LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
847
848         CDEBUG(D_OTHER, "index delete "DFID" name: %s\n",
849                PFID(lu_object_fid(&arg->object->do_lu)),
850                (char *)arg->u.insert.key);
851
852         if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
853                 CERROR("%s: "DFID" is not directory: rc = %d\n",
854                        mdt_obd_name(info->mti_mdt),
855                        PFID(lu_object_fid(&dt_obj->do_lu)), -ENOTDIR);
856                 return -ENOTDIR;
857         }
858
859         dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
860         rc = dt_delete(info->mti_env, dt_obj, arg->u.insert.key, th, NULL);
861         dt_write_unlock(info->mti_env, dt_obj);
862
863         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
864
865         return rc;
866 }
867
868 static int __out_tx_index_insert(struct mdt_thread_info *info,
869                                  struct dt_object *dt_obj,
870                                  char *name, struct lu_fid *fid,
871                                  struct thandle_exec_args *th,
872                                  struct update_reply *reply,
873                                  int index, char *file, int line)
874 {
875         struct tx_arg *arg;
876
877         LASSERT(th->ta_handle != NULL);
878
879         if (lu_object_exists(&dt_obj->do_lu)) {
880                 if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
881                         th->ta_err = -ENOTDIR;
882                         return th->ta_err;
883                 }
884                 th->ta_err = dt_declare_insert(info->mti_env, dt_obj,
885                                                (struct dt_rec *)fid,
886                                                (struct dt_key *)name,
887                                                th->ta_handle);
888         }
889
890         if (th->ta_err != 0)
891                 return th->ta_err;
892
893         arg = tx_add_exec(th, out_tx_index_insert_exec,
894                           out_tx_index_insert_undo, file,
895                           line);
896         LASSERT(arg);
897         lu_object_get(&dt_obj->do_lu);
898         arg->object = dt_obj;
899         arg->reply = reply;
900         arg->index = index;
901         arg->u.insert.rec = (struct dt_rec *)fid;
902         arg->u.insert.key = (struct dt_key *)name;
903
904         return 0;
905 }
906
907 static int out_index_insert(struct mdt_thread_info *info)
908 {
909         struct update     *update = info->mti_u.update.mti_update;
910         struct dt_object  *obj = info->mti_u.update.mti_dt_object;
911         struct lu_fid     *fid;
912         char              *name;
913         int               rc = 0;
914         int               size;
915         ENTRY;
916
917         name = (char *)update_param_buf(update, 0, NULL);
918         if (name == NULL) {
919                 CERROR("%s: empty name for index insert: rc = %d\n",
920                        mdt_obd_name(info->mti_mdt), -EPROTO);
921                 RETURN(err_serious(-EPROTO));
922         }
923
924         fid = (struct lu_fid *)update_param_buf(update, 1, &size);
925         if (fid == NULL || size != sizeof(*fid)) {
926                 CERROR("%s: invalid fid: rc = %d\n",
927                        mdt_obd_name(info->mti_mdt), -EPROTO);
928                        RETURN(err_serious(-EPROTO));
929         }
930
931         fid_le_to_cpu(fid, fid);
932         if (!fid_is_sane(fid)) {
933                 CERROR("%s: invalid FID "DFID": rc = %d\n",
934                        mdt_obd_name(info->mti_mdt), PFID(fid),
935                        -EPROTO);
936                 RETURN(err_serious(-EPROTO));
937         }
938
939         rc = out_tx_index_insert(info, obj, name, fid, &info->mti_handle,
940                                  info->mti_u.update.mti_update_reply,
941                                  info->mti_u.update.mti_update_reply_index);
942         RETURN(rc);
943 }
944
945 static int out_tx_index_delete_exec(struct mdt_thread_info *info,
946                                     struct thandle *th,
947                                     struct tx_arg *arg)
948 {
949         return out_tx_index_insert_undo(info, th, arg);
950 }
951
952 static int out_tx_index_delete_undo(struct mdt_thread_info *info,
953                                     struct thandle *th,
954                                     struct tx_arg *arg)
955 {
956         return out_tx_index_insert_exec(info, th, arg);
957 }
958
959 static int __out_tx_index_delete(struct mdt_thread_info *info,
960                                  struct dt_object *dt_obj, char *name,
961                                  struct thandle_exec_args *th,
962                                  struct update_reply *reply,
963                                  int index, char *file, int line)
964 {
965         struct tx_arg *arg;
966
967         if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
968                 th->ta_err = -ENOTDIR;
969                 return th->ta_err;
970         }
971
972         LASSERT(th->ta_handle != NULL);
973         th->ta_err = dt_declare_delete(info->mti_env, dt_obj,
974                                        (struct dt_key *)name,
975                                        th->ta_handle);
976         if (th->ta_err != 0)
977                 return th->ta_err;
978
979         arg = tx_add_exec(th, out_tx_index_delete_exec,
980                           out_tx_index_delete_undo, file,
981                           line);
982         LASSERT(arg);
983         lu_object_get(&dt_obj->do_lu);
984         arg->object = dt_obj;
985         arg->reply = reply;
986         arg->index = index;
987         arg->u.insert.key = (struct dt_key *)name;
988         return 0;
989 }
990
991 static int out_index_delete(struct mdt_thread_info *info)
992 {
993         struct update           *update = info->mti_u.update.mti_update;
994         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
995         char                    *name;
996         int                     rc = 0;
997
998         if (!lu_object_exists(&obj->do_lu))
999                 RETURN(-ENOENT);
1000         name = (char *)update_param_buf(update, 0, NULL);
1001         if (name == NULL) {
1002                 CERROR("%s: empty name for index delete: rc = %d\n",
1003                        mdt_obd_name(info->mti_mdt), -EPROTO);
1004                 RETURN(err_serious(-EPROTO));
1005         }
1006
1007         rc = out_tx_index_delete(info, obj, name, &info->mti_handle,
1008                                  info->mti_u.update.mti_update_reply,
1009                                  info->mti_u.update.mti_update_reply_index);
1010         RETURN(rc);
1011 }
1012
1013 #define DEF_OUT_HNDL(opc, name, fail_id, flags, fn)     \
1014 [opc - OBJ_CREATE] = {                                  \
1015         .mh_name    = name,                             \
1016         .mh_fail_id = fail_id,                          \
1017         .mh_opc     = opc,                              \
1018         .mh_flags   = flags,                            \
1019         .mh_act     = fn,                               \
1020         .mh_fmt     = NULL                              \
1021 }
1022
1023 #define out_handler mdt_handler
1024 static struct out_handler out_update_ops[] = {
1025         DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO,
1026                      out_create),
1027         DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO,
1028                      out_ref_add),
1029         DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO,
1030                      out_ref_del),
1031         DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", 0,  MUTABOR | HABEO_REFERO,
1032                      out_attr_set),
1033         DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", 0,  HABEO_REFERO,
1034                      out_attr_get),
1035         DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", 0, MUTABOR | HABEO_REFERO,
1036                      out_xattr_set),
1037         DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO,
1038                      out_xattr_get),
1039         DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0,
1040                      HABEO_REFERO, out_index_lookup),
1041         DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0,
1042                      MUTABOR | HABEO_REFERO, out_index_insert),
1043         DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0,
1044                      MUTABOR | HABEO_REFERO, out_index_delete),
1045 };
1046
1047 #define out_opc_slice mdt_opc_slice
1048 static struct out_opc_slice out_handlers[] = {
1049         {
1050                 .mos_opc_start = OBJ_CREATE,
1051                 .mos_opc_end   = OBJ_LAST,
1052                 .mos_hs = out_update_ops
1053         },
1054 };
1055
1056 /**
1057  * Object updates between Targets. Because all the updates has been
1058  * dis-assemblied into object updates in master MDD layer, so out
1059  * will skip MDD layer, and call OSD API directly to execute these
1060  * updates.
1061  *
1062  * In phase I, all of the updates in the request need to be executed
1063  * in one transaction, and the transaction has to be synchronously.
1064  *
1065  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1066  * format.
1067  */
1068 int out_handle(struct mdt_thread_info *info)
1069 {
1070         struct req_capsule        *pill = info->mti_pill;
1071         struct update_buf         *ubuf;
1072         struct update             *update;
1073         struct thandle_exec_args  *th = &info->mti_handle;
1074         int                       bufsize;
1075         int                       count;
1076         unsigned                  off;
1077         int                       i;
1078         int                       rc = 0;
1079         int                       rc1 = 0;
1080         ENTRY;
1081
1082         req_capsule_set(pill, &RQF_UPDATE_OBJ);
1083         bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1084         if (bufsize != UPDATE_BUFFER_SIZE) {
1085                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1086                        mdt_obd_name(info->mti_mdt), bufsize, -EPROTO);
1087                 RETURN(err_serious(-EPROTO));
1088         }
1089
1090         ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1091         if (ubuf == NULL) {
1092                 CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(info->mti_mdt),
1093                        -EPROTO);
1094                 RETURN(err_serious(-EPROTO));
1095         }
1096
1097         if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
1098                 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1099                        mdt_obd_name(info->mti_mdt), le32_to_cpu(ubuf->ub_magic),
1100                        UPDATE_BUFFER_MAGIC, -EPROTO);
1101                 RETURN(err_serious(-EPROTO));
1102         }
1103
1104         count = le32_to_cpu(ubuf->ub_count);
1105         if (count <= 0) {
1106                 CERROR("%s: No update!: rc = %d\n",
1107                        mdt_obd_name(info->mti_mdt), -EPROTO);
1108                 RETURN(err_serious(-EPROTO));
1109         }
1110
1111         req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1112                              UPDATE_BUFFER_SIZE);
1113         rc = req_capsule_server_pack(pill);
1114         if (rc != 0) {
1115                 CERROR("%s: Can't pack response: rc = %d\n",
1116                        mdt_obd_name(info->mti_mdt), rc);
1117                 RETURN(rc);
1118         }
1119
1120         /* Prepare the update reply buffer */
1121         info->mti_u.update.mti_update_reply =
1122                         req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1123         update_init_reply_buf(info->mti_u.update.mti_update_reply, count);
1124
1125         rc = out_tx_start(info->mti_env, info->mti_mdt, th);
1126         if (rc != 0)
1127                 RETURN(rc);
1128
1129         /* Walk through updates in the request to execute them synchronously */
1130         off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1131         for (i = 0; i < count; i++) {
1132                 struct out_handler *h;
1133                 struct dt_object   *dt_obj;
1134
1135                 update = (struct update *)((char *)ubuf + off);
1136
1137                 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1138                 if (!fid_is_sane(&update->u_fid)) {
1139                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1140                                mdt_obd_name(info->mti_mdt),
1141                                PFID(&update->u_fid), -EPROTO);
1142                         GOTO(out, rc = err_serious(-EPROTO));
1143                 }
1144                 dt_obj = out_object_find(info, &update->u_fid);
1145                 if (IS_ERR(dt_obj))
1146                         GOTO(out, rc = PTR_ERR(dt_obj));
1147
1148                 info->mti_u.update.mti_dt_object = dt_obj;
1149                 info->mti_u.update.mti_update = update;
1150                 info->mti_u.update.mti_update_reply_index = i;
1151
1152                 h = mdt_handler_find(update->u_type, out_handlers);
1153                 if (likely(h != NULL)) {
1154                         rc = h->mh_act(info);
1155                 } else {
1156                         CERROR("%s: The unsupported opc: 0x%x\n",
1157                                mdt_obd_name(info->mti_mdt), update->u_type);
1158                         lu_object_put(info->mti_env, &dt_obj->do_lu);
1159                         GOTO(out, rc = -ENOTSUPP);
1160                 }
1161                 lu_object_put(info->mti_env, &dt_obj->do_lu);
1162                 if (rc < 0)
1163                         GOTO(out, rc);
1164                 off += cfs_size_round(update_size(update));
1165         }
1166
1167 out:
1168         rc1 = out_tx_end(info, th);
1169         rc = rc == 0 ? rc1 : rc;
1170         info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET;
1171         RETURN(rc);
1172 }