Whamcloud - gitweb
b=24037 Changes of 2.6.32 kernel.
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_object.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <lustre_fid.h>
50 #include "cmm_internal.h"
51 #include "mdc_internal.h"
52 /**
53  * \ingroup cmm
54  * Lookup MDS number \a mds by FID \a fid.
55  *
56  * \param fid FID of object to find MDS
57  * \param mds mds number to return.
58  */
59 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
60                    mdsno_t *mds, const struct lu_env *env)
61 {
62         int rc = 0;
63         ENTRY;
64
65         LASSERT(fid_is_sane(fid));
66
67         rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds,
68                                LU_SEQ_RANGE_MDT, env);
69         if (rc) {
70                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
71                        fid_seq(fid), rc);
72                 RETURN(rc);
73         }
74
75         if (*mds > cm->cmm_tgt_count) {
76                 CERROR("Got invalid mdsno: %x (max: %x)\n",
77                        *mds, cm->cmm_tgt_count);
78                 rc = -EINVAL;
79         } else {
80                 CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
81                        LPX64"\n", *mds, fid_seq(fid));
82         }
83
84         RETURN (rc);
85 }
86
87 /**
88  * \addtogroup cml
89  * @{
90  */
91 static const struct md_object_operations cml_mo_ops;
92 static const struct md_dir_operations    cml_dir_ops;
93 static const struct lu_object_operations cml_obj_ops;
94
95 static const struct md_object_operations cmr_mo_ops;
96 static const struct md_dir_operations    cmr_dir_ops;
97 static const struct lu_object_operations cmr_obj_ops;
98
99 /**
100  * \ingroup cmm
101  * Allocate CMM object.
102  */
103 struct lu_object *cmm_object_alloc(const struct lu_env *env,
104                                    const struct lu_object_header *loh,
105                                    struct lu_device *ld)
106 {
107         const struct lu_fid *fid = &loh->loh_fid;
108         struct lu_object  *lo = NULL;
109         struct cmm_device *cd;
110         mdsno_t mds;
111         int rc = 0;
112
113         ENTRY;
114
115         cd = lu2cmm_dev(ld);
116         if (cd->cmm_flags & CMM_INITIALIZED) {
117                 /* get object location */
118                 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
119                 if (rc)
120                         RETURN(NULL);
121         } else
122                 /*
123                  * Device is not yet initialized, cmm_object is being created
124                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
125                  * etc.). Such object *has* to be local.
126                  */
127                 mds = cd->cmm_local_num;
128
129         /* select the proper set of operations based on object location */
130         if (mds == cd->cmm_local_num) {
131                 struct cml_object *clo;
132
133                 OBD_ALLOC_PTR(clo);
134                 if (clo != NULL) {
135                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
136                         lu_object_init(lo, NULL, ld);
137                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
138                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
139                         lo->lo_ops = &cml_obj_ops;
140                 }
141         } else {
142                 struct cmr_object *cro;
143
144                 OBD_ALLOC_PTR(cro);
145                 if (cro != NULL) {
146                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
147                         lu_object_init(lo, NULL, ld);
148                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
149                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
150                         lo->lo_ops = &cmr_obj_ops;
151                         cro->cmo_num = mds;
152                 }
153         }
154         RETURN(lo);
155 }
156
157 /**
158  * Get local child device.
159  */
160 static struct lu_device *cml_child_dev(struct cmm_device *d)
161 {
162         return &d->cmm_child->md_lu_dev;
163 }
164
165 /**
166  * Free cml_object.
167  */
168 static void cml_object_free(const struct lu_env *env,
169                             struct lu_object *lo)
170 {
171         struct cml_object *clo = lu2cml_obj(lo);
172         lu_object_fini(lo);
173         OBD_FREE_PTR(clo);
174 }
175
176 /**
177  * Initialize cml_object.
178  */
179 static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
180                            const struct lu_object_conf *unused)
181 {
182         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
183         struct lu_device  *c_dev;
184         struct lu_object  *c_obj;
185         int rc;
186
187         ENTRY;
188
189 #ifdef HAVE_SPLIT_SUPPORT
190         if (cd->cmm_tgt_count == 0)
191                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
192         else
193                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
194 #endif
195         c_dev = cml_child_dev(cd);
196         if (c_dev == NULL) {
197                 rc = -ENOENT;
198         } else {
199                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
200                                                         lo->lo_header, c_dev);
201                 if (c_obj != NULL) {
202                         lu_object_add(lo, c_obj);
203                         rc = 0;
204                 } else {
205                         rc = -ENOMEM;
206                 }
207         }
208
209         RETURN(rc);
210 }
211
212 static int cml_object_print(const struct lu_env *env, void *cookie,
213                             lu_printer_t p, const struct lu_object *lo)
214 {
215         return (*p)(env, cookie, "[local]");
216 }
217
218 static const struct lu_object_operations cml_obj_ops = {
219         .loo_object_init    = cml_object_init,
220         .loo_object_free    = cml_object_free,
221         .loo_object_print   = cml_object_print
222 };
223
224 /**
225  * \name CMM local md_object operations.
226  * All of them call just corresponding operations on next layer.
227  * @{
228  */
229 static int cml_object_create(const struct lu_env *env,
230                              struct md_object *mo,
231                              const struct md_op_spec *spec,
232                              struct md_attr *attr)
233 {
234         int rc;
235         ENTRY;
236         rc = mo_object_create(env, md_object_next(mo), spec, attr);
237         RETURN(rc);
238 }
239
240 static int cml_permission(const struct lu_env *env,
241                           struct md_object *p, struct md_object *c,
242                           struct md_attr *attr, int mask)
243 {
244         int rc;
245         ENTRY;
246         rc = mo_permission(env, md_object_next(p), md_object_next(c),
247                            attr, mask);
248         RETURN(rc);
249 }
250
251 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
252                         struct md_attr *attr)
253 {
254         int rc;
255         ENTRY;
256         rc = mo_attr_get(env, md_object_next(mo), attr);
257         RETURN(rc);
258 }
259
260 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
261                         const struct md_attr *attr)
262 {
263         int rc;
264         ENTRY;
265         rc = mo_attr_set(env, md_object_next(mo), attr);
266         RETURN(rc);
267 }
268
269 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
270                          struct lu_buf *buf, const char *name)
271 {
272         int rc;
273         ENTRY;
274         rc = mo_xattr_get(env, md_object_next(mo), buf, name);
275         RETURN(rc);
276 }
277
278 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
279                         struct lu_buf *buf)
280 {
281         int rc;
282         ENTRY;
283         rc = mo_readlink(env, md_object_next(mo), buf);
284         RETURN(rc);
285 }
286
287 static int cml_changelog(const struct lu_env *env, enum changelog_rec_type type,
288                          int flags, struct md_object *mo)
289 {
290         int rc;
291         ENTRY;
292         rc = mo_changelog(env, type, flags, md_object_next(mo));
293         RETURN(rc);
294 }
295
296 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
297                           struct lu_buf *buf)
298 {
299         int rc;
300         ENTRY;
301         rc = mo_xattr_list(env, md_object_next(mo), buf);
302         RETURN(rc);
303 }
304
305 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
306                          const struct lu_buf *buf, const char *name,
307                          int fl)
308 {
309         int rc;
310         ENTRY;
311         rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
312         RETURN(rc);
313 }
314
315 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
316                          const char *name)
317 {
318         int rc;
319         ENTRY;
320         rc = mo_xattr_del(env, md_object_next(mo), name);
321         RETURN(rc);
322 }
323
324 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
325                        const struct md_attr *ma)
326 {
327         int rc;
328         ENTRY;
329         rc = mo_ref_add(env, md_object_next(mo), ma);
330         RETURN(rc);
331 }
332
333 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
334                        struct md_attr *ma)
335 {
336         int rc;
337         ENTRY;
338         rc = mo_ref_del(env, md_object_next(mo), ma);
339         RETURN(rc);
340 }
341
342 static int cml_open(const struct lu_env *env, struct md_object *mo,
343                     int flags)
344 {
345         int rc;
346         ENTRY;
347         rc = mo_open(env, md_object_next(mo), flags);
348         RETURN(rc);
349 }
350
351 static int cml_close(const struct lu_env *env, struct md_object *mo,
352                      struct md_attr *ma)
353 {
354         int rc;
355         ENTRY;
356         rc = mo_close(env, md_object_next(mo), ma);
357         RETURN(rc);
358 }
359
360 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
361                         const struct lu_rdpg *rdpg)
362 {
363         int rc;
364         ENTRY;
365         rc = mo_readpage(env, md_object_next(mo), rdpg);
366         RETURN(rc);
367 }
368
369 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
370                         struct lustre_capa *capa, int renewal)
371 {
372         int rc;
373         ENTRY;
374         rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
375         RETURN(rc);
376 }
377
378 static int cml_path(const struct lu_env *env, struct md_object *mo,
379                     char *path, int pathlen, __u64 *recno, int *linkno)
380 {
381         int rc;
382         ENTRY;
383         rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
384         RETURN(rc);
385 }
386
387 static int cml_file_lock(const struct lu_env *env, struct md_object *mo,
388                          struct lov_mds_md *lmm, struct ldlm_extent *extent,
389                          struct lustre_handle *lockh)
390 {
391         int rc;
392         ENTRY;
393         rc = mo_file_lock(env, md_object_next(mo), lmm, extent, lockh);
394         RETURN(rc);
395 }
396
397 static int cml_file_unlock(const struct lu_env *env, struct md_object *mo,
398                            struct lov_mds_md *lmm, struct lustre_handle *lockh)
399 {
400         int rc;
401         ENTRY;
402         rc = mo_file_unlock(env, md_object_next(mo), lmm, lockh);
403         RETURN(rc);
404 }
405
406 static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
407 {
408         int rc;
409         ENTRY;
410         rc = mo_object_sync(env, md_object_next(mo));
411         RETURN(rc);
412 }
413
414 static dt_obj_version_t cml_version_get(const struct lu_env *env,
415                                         struct md_object *mo)
416 {
417         return mo_version_get(env, md_object_next(mo));
418 }
419
420 static void cml_version_set(const struct lu_env *env, struct md_object *mo,
421                             dt_obj_version_t version)
422 {
423         return mo_version_set(env, md_object_next(mo), version);
424 }
425
426 static const struct md_object_operations cml_mo_ops = {
427         .moo_permission    = cml_permission,
428         .moo_attr_get      = cml_attr_get,
429         .moo_attr_set      = cml_attr_set,
430         .moo_xattr_get     = cml_xattr_get,
431         .moo_xattr_list    = cml_xattr_list,
432         .moo_xattr_set     = cml_xattr_set,
433         .moo_xattr_del     = cml_xattr_del,
434         .moo_object_create = cml_object_create,
435         .moo_ref_add       = cml_ref_add,
436         .moo_ref_del       = cml_ref_del,
437         .moo_open          = cml_open,
438         .moo_close         = cml_close,
439         .moo_readpage      = cml_readpage,
440         .moo_readlink      = cml_readlink,
441         .moo_changelog     = cml_changelog,
442         .moo_capa_get      = cml_capa_get,
443         .moo_object_sync   = cml_object_sync,
444         .moo_version_get   = cml_version_get,
445         .moo_version_set   = cml_version_set,
446         .moo_path          = cml_path,
447         .moo_file_lock     = cml_file_lock,
448         .moo_file_unlock   = cml_file_unlock,
449 };
450 /** @} */
451
452 /**
453  * \name CMM local md_dir_operations.
454  * @{
455  */
456 /**
457  * cml lookup object fid by name.
458  * This returns only FID by name.
459  */
460 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
461                       const struct lu_name *lname, struct lu_fid *lf,
462                       struct md_op_spec *spec)
463 {
464         int rc;
465         ENTRY;
466
467 #ifdef HAVE_SPLIT_SUPPORT
468         if (spec != NULL && spec->sp_ck_split) {
469                 rc = cmm_split_check(env, mo_p, lname->ln_name);
470                 if (rc)
471                         RETURN(rc);
472         }
473 #endif
474         rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
475         RETURN(rc);
476
477 }
478
479 /**
480  * Helper to return lock mode. Used in split cases only.
481  */
482 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
483                                 struct md_object *mo, mdl_mode_t lm)
484 {
485         int rc = MDL_MINMODE;
486         ENTRY;
487
488 #ifdef HAVE_SPLIT_SUPPORT
489         rc = cmm_split_access(env, mo, lm);
490 #endif
491
492         RETURN(rc);
493 }
494
495 /**
496  * Create operation for cml.
497  * Objects are local, but split can happen.
498  * If split is not needed this will call next layer mdo_create().
499  *
500  * \param mo_p Parent directory. Local object.
501  * \param lname name of file to create.
502  * \param mo_c Child object. It has no real inode yet.
503  * \param spec creation specification.
504  * \param ma child object attributes.
505  */
506 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
507                       const struct lu_name *lname, struct md_object *mo_c,
508                       struct md_op_spec *spec, struct md_attr *ma)
509 {
510         int rc;
511         ENTRY;
512
513 #ifdef HAVE_SPLIT_SUPPORT
514         /* Lock mode always should be sane. */
515         LASSERT(spec->sp_cr_mode != MDL_MINMODE);
516
517         /*
518          * Sigh... This is long story. MDT may have race with detecting if split
519          * is possible in cmm. We know this race and let it live, because
520          * getting it rid (with some sem or spinlock) will also mean that
521          * PDIROPS for create will not work because we kill parallel work, what
522          * is really bad for performance and makes no sense having PDIROPS. So,
523          * we better allow the race to live, but split dir only if some of
524          * concurrent threads takes EX lock, not matter which one. So that, say,
525          * two concurrent threads may have different lock modes on directory (CW
526          * and EX) and not first one which comes here and see that split is
527          * possible should split the dir, but only that one which has EX
528          * lock. And we do not care that in this case, split may happen a bit
529          * later (when dir size will not be necessarily 64K, but may be a bit
530          * larger). So that, we allow concurrent creates and protect split by EX
531          * lock.
532          */
533         if (spec->sp_cr_mode == MDL_EX) {
534                 /**
535                  * Split cases:
536                  * - Try to split \a mo_p upon each create operation.
537                  *   If split is ok, -ERESTART is returned and current thread
538                  *   will not peoceed with create. Instead it sends -ERESTART
539                  *   to client to let it know that correct MDT must be chosen.
540                  * \see cmm_split_dir()
541                  */
542                 rc = cmm_split_dir(env, mo_p);
543                 if (rc)
544                         /*
545                          * -ERESTART or some split error is returned, we can't
546                          * proceed with create.
547                          */
548                         GOTO(out, rc);
549         }
550
551         if (spec != NULL && spec->sp_ck_split) {
552                 /**
553                  * - Directory is split already. Let the caller know that
554                  * it should tell client that directory is split and operation
555                  * should repeat to correct MDT.
556                  * \see cmm_split_check()
557                  */
558                 rc = cmm_split_check(env, mo_p, lname->ln_name);
559                 if (rc)
560                         GOTO(out, rc);
561         }
562 #endif
563
564         rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
565                         spec, ma);
566
567         EXIT;
568 #ifdef HAVE_SPLIT_SUPPORT
569 out:
570 #endif
571         return rc;
572 }
573
574 /** Call mdo_create_data() on next layer. All objects are local. */
575 static int cml_create_data(const struct lu_env *env, struct md_object *p,
576                            struct md_object *o,
577                            const struct md_op_spec *spec,
578                            struct md_attr *ma)
579 {
580         int rc;
581         ENTRY;
582         rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
583                              spec, ma);
584         RETURN(rc);
585 }
586
587 /** Call mdo_link() on next layer. All objects are local. */
588 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
589                     struct md_object *mo_s, const struct lu_name *lname,
590                     struct md_attr *ma)
591 {
592         int rc;
593         ENTRY;
594         rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
595                       lname, ma);
596         RETURN(rc);
597 }
598
599 /** Call mdo_unlink() on next layer. All objects are local. */
600 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
601                       struct md_object *mo_c, const struct lu_name *lname,
602                       struct md_attr *ma)
603 {
604         int rc;
605         ENTRY;
606         rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
607                         lname, ma);
608         RETURN(rc);
609 }
610
611 /**
612  * \ingroup cmm
613  * Get mode of object.
614  * Used in both cml and cmr hence can produce RPC to another server.
615  */
616 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
617                         const struct lu_fid *lf, struct md_attr *ma,
618                         int *remote)
619 {
620         struct md_object *mo_s = md_object_find_slice(env, md, lf);
621         struct cmm_thread_info *cmi;
622         struct md_attr *tmp_ma;
623         int rc;
624         ENTRY;
625
626         if (IS_ERR(mo_s))
627                 RETURN(PTR_ERR(mo_s));
628
629         if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
630                 *remote = 1;
631
632         cmi = cmm_env_info(env);
633         tmp_ma = &cmi->cmi_ma;
634         tmp_ma->ma_need = MA_INODE;
635         tmp_ma->ma_valid = 0;
636         /* get type from src, can be remote req */
637         rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
638         if (rc == 0) {
639                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
640                 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
641                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
642                 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
643                 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
644         }
645         lu_object_put(env, &mo_s->mo_lu);
646         RETURN(rc);
647 }
648
649 /**
650  * \ingroup cmm
651  * Set ctime for object.
652  * Used in both cml and cmr hence can produce RPC to another server.
653  */
654 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
655                             const struct lu_fid *lf, struct md_attr *ma)
656 {
657         struct md_object *mo_s = md_object_find_slice(env, md, lf);
658         int rc;
659         ENTRY;
660
661         if (IS_ERR(mo_s))
662                 RETURN(PTR_ERR(mo_s));
663
664         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
665         /* set ctime to obj, can be remote req */
666         rc = mo_attr_set(env, md_object_next(mo_s), ma);
667         lu_object_put(env, &mo_s->mo_lu);
668         RETURN(rc);
669 }
670
671 /** Helper to output debug information about rename operation. */
672 static inline void cml_rename_warn(const char *fname,
673                                   struct md_object *mo_po,
674                                   struct md_object *mo_pn,
675                                   const struct lu_fid *lf,
676                                   const char *s_name,
677                                   struct md_object *mo_t,
678                                   const char *t_name,
679                                   int err)
680 {
681         if (mo_t)
682                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
683                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
684                       "[tname %s] [err %d]\n", fname,
685                       PFID(lu_object_fid(&mo_po->mo_lu)),
686                       PFID(lu_object_fid(&mo_pn->mo_lu)),
687                       PFID(lf), s_name,
688                       PFID(lu_object_fid(&mo_t->mo_lu)),
689                       t_name, err);
690         else
691                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
692                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
693                       "[tname %s] [err %d]\n", fname,
694                       PFID(lu_object_fid(&mo_po->mo_lu)),
695                       PFID(lu_object_fid(&mo_pn->mo_lu)),
696                       PFID(lf), s_name,
697                       t_name, err);
698 }
699
700 /**
701  * Rename operation for cml.
702  *
703  * This is the most complex cross-reference operation. It may consist of up to 4
704  * MDS server and require several RPCs to be sent.
705  *
706  * \param mo_po Old parent object.
707  * \param mo_pn New parent object.
708  * \param lf FID of object to rename.
709  * \param ls_name Source file name.
710  * \param mo_t target object. Should be NULL here.
711  * \param lt_name Name of target file.
712  * \param ma object attributes.
713  */
714 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
715                       struct md_object *mo_pn, const struct lu_fid *lf,
716                       const struct lu_name *ls_name, struct md_object *mo_t,
717                       const struct lu_name *lt_name, struct md_attr *ma)
718 {
719         struct cmm_thread_info *cmi;
720         struct md_attr *tmp_ma = NULL;
721         struct md_object *tmp_t = mo_t;
722         int remote = 0, rc;
723         ENTRY;
724
725         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
726         if (rc)
727                 RETURN(rc);
728
729         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
730                 /**
731                  * \note \a mo_t is remote object and there is RPC to unlink it.
732                  * Before that, do local sanity check for rename first.
733                  */
734                 if (!remote) {
735                         struct md_object *mo_s = md_object_find_slice(env,
736                                                         md_obj2dev(mo_po), lf);
737                         if (IS_ERR(mo_s))
738                                 RETURN(PTR_ERR(mo_s));
739
740                         LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
741                         rc = mo_permission(env, md_object_next(mo_po),
742                                            md_object_next(mo_s),
743                                            ma, MAY_RENAME_SRC);
744                         lu_object_put(env, &mo_s->mo_lu);
745                         if (rc)
746                                 RETURN(rc);
747                 } else {
748                         rc = mo_permission(env, NULL, md_object_next(mo_po),
749                                            ma, MAY_UNLINK | MAY_VTX_FULL);
750                         if (rc)
751                                 RETURN(rc);
752                 }
753
754                 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
755                                    MAY_UNLINK | MAY_VTX_PART);
756                 if (rc)
757                         RETURN(rc);
758
759                 /*
760                  * /note \a ma will be changed after mo_ref_del(), but we will use
761                  * it for mdo_rename() later, so save it before mo_ref_del().
762                  */
763                 cmi = cmm_env_info(env);
764                 tmp_ma = &cmi->cmi_ma;
765                 *tmp_ma = *ma;
766                 rc = mo_ref_del(env, md_object_next(mo_t), ma);
767                 if (rc)
768                         RETURN(rc);
769
770                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
771                 mo_t = NULL;
772         }
773
774         /**
775          * \note for src on remote MDS case, change its ctime before local
776          * rename. Firstly, do local sanity check for rename if necessary.
777          */
778         if (remote) {
779                 if (!tmp_ma) {
780                         rc = mo_permission(env, NULL, md_object_next(mo_po),
781                                            ma, MAY_UNLINK | MAY_VTX_FULL);
782                         if (rc)
783                                 RETURN(rc);
784
785                         if (mo_t) {
786                                 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
787                                 rc = mo_permission(env, md_object_next(mo_pn),
788                                                    md_object_next(mo_t),
789                                                    ma, MAY_RENAME_TAR);
790                                 if (rc)
791                                         RETURN(rc);
792                         } else {
793                                 int mask;
794
795                                 if (mo_po != mo_pn)
796                                         mask = (S_ISDIR(ma->ma_attr.la_mode) ?
797                                                 MAY_LINK : MAY_CREATE);
798                                 else
799                                         mask = MAY_CREATE;
800                                 rc = mo_permission(env, NULL,
801                                                    md_object_next(mo_pn),
802                                                    NULL, mask);
803                                 if (rc)
804                                         RETURN(rc);
805                         }
806
807                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
808                 } else {
809                         LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
810                 }
811
812                 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
813                                       tmp_ma ? tmp_ma : ma);
814                 if (rc) {
815                         /* TODO: revoke mo_t if necessary. */
816                         cml_rename_warn("cmm_rename_ctime", mo_po,
817                                         mo_pn, lf, ls_name->ln_name,
818                                         tmp_t, lt_name->ln_name, rc);
819                         RETURN(rc);
820                 }
821         }
822
823         /* local rename, mo_t can be NULL */
824         rc = mdo_rename(env, md_object_next(mo_po),
825                         md_object_next(mo_pn), lf, ls_name,
826                         md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
827         if (rc)
828                 /* TODO: revoke all cml_rename */
829                 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
830                                 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
831
832         RETURN(rc);
833 }
834
835 /**
836  * Rename target partial operation.
837  * Used for cross-ref rename.
838  */
839 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
840                           struct md_object *mo_t, const struct lu_fid *lf,
841                           const struct lu_name *lname, struct md_attr *ma)
842 {
843         int rc;
844         ENTRY;
845
846         rc = mdo_rename_tgt(env, md_object_next(mo_p),
847                             md_object_next(mo_t), lf, lname, ma);
848         RETURN(rc);
849 }
850
851 /**
852  * Name insert only operation.
853  * used only in case of rename_tgt() when target doesn't exist.
854  */
855 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
856                            const struct lu_name *lname, const struct lu_fid *lf,
857                            const struct md_attr *ma)
858 {
859         int rc;
860         ENTRY;
861
862         rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
863
864         RETURN(rc);
865 }
866
867 /**
868  * \ingroup cmm
869  * Check two fids are not subdirectories.
870  */
871 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
872                          const struct lu_fid *fid, struct lu_fid *sfid)
873 {
874         struct cmm_thread_info *cmi;
875         int rc;
876         ENTRY;
877
878         cmi = cmm_env_info(env);
879         rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
880         if (rc)
881                 RETURN(rc);
882
883         if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
884                 RETURN(0);
885
886         rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
887         RETURN(rc);
888 }
889
890 static const struct md_dir_operations cml_dir_ops = {
891         .mdo_is_subdir   = cmm_is_subdir,
892         .mdo_lookup      = cml_lookup,
893         .mdo_lock_mode   = cml_lock_mode,
894         .mdo_create      = cml_create,
895         .mdo_link        = cml_link,
896         .mdo_unlink      = cml_unlink,
897         .mdo_name_insert = cml_name_insert,
898         .mdo_rename      = cml_rename,
899         .mdo_rename_tgt  = cml_rename_tgt,
900         .mdo_create_data = cml_create_data,
901 };
902 /** @} */
903 /** @} */
904
905 /**
906  * \addtogroup cmr
907  * @{
908  */
909 /**
910  * \name cmr helpers
911  * @{
912  */
913 /** Get cmr_object from lu_object. */
914 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
915 {
916         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
917 }
918 /** Get cmr_object from md_object. */
919 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
920 {
921         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
922 }
923 /** Get cmr_object from cmm_object. */
924 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
925 {
926         return container_of0(co, struct cmr_object, cmm_obj);
927 }
928 /** @} */
929
930 /**
931  * Get proper child device from MDCs.
932  */
933 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
934 {
935         struct lu_device *next = NULL;
936         struct mdc_device *mdc;
937
938         cfs_spin_lock(&d->cmm_tgt_guard);
939         cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
940                 if (mdc->mc_num == num) {
941                         next = mdc2lu_dev(mdc);
942                         break;
943                 }
944         }
945         cfs_spin_unlock(&d->cmm_tgt_guard);
946         return next;
947 }
948
949 /**
950  * Free cmr_object.
951  */
952 static void cmr_object_free(const struct lu_env *env,
953                             struct lu_object *lo)
954 {
955         struct cmr_object *cro = lu2cmr_obj(lo);
956         lu_object_fini(lo);
957         OBD_FREE_PTR(cro);
958 }
959
960 /**
961  * Initialize cmr object.
962  */
963 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
964                            const struct lu_object_conf *unused)
965 {
966         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
967         struct lu_device  *c_dev;
968         struct lu_object  *c_obj;
969         int rc;
970
971         ENTRY;
972
973         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
974         if (c_dev == NULL) {
975                 rc = -ENOENT;
976         } else {
977                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
978                                                         lo->lo_header, c_dev);
979                 if (c_obj != NULL) {
980                         lu_object_add(lo, c_obj);
981                         rc = 0;
982                 } else {
983                         rc = -ENOMEM;
984                 }
985         }
986
987         RETURN(rc);
988 }
989
990 /**
991  * Output lu_object data.
992  */
993 static int cmr_object_print(const struct lu_env *env, void *cookie,
994                             lu_printer_t p, const struct lu_object *lo)
995 {
996         const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
997         return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
998 }
999
1000 /**
1001  * Cmr instance of lu_object_operations.
1002  */
1003 static const struct lu_object_operations cmr_obj_ops = {
1004         .loo_object_init    = cmr_object_init,
1005         .loo_object_free    = cmr_object_free,
1006         .loo_object_print   = cmr_object_print
1007 };
1008
1009 /**
1010  * \name cmr remote md_object operations.
1011  * All operations here are invalid and return errors. There is no local object
1012  * so these operations return two kinds of error:
1013  * -# -EFAULT if operation is prohibited.
1014  * -# -EREMOTE if operation can be done just to notify upper level about remote
1015  *  object.
1016  *
1017  * @{
1018  */
1019 static int cmr_object_create(const struct lu_env *env,
1020                              struct md_object *mo,
1021                              const struct md_op_spec *spec,
1022                              struct md_attr *ma)
1023 {
1024         return -EFAULT;
1025 }
1026
1027 static int cmr_permission(const struct lu_env *env,
1028                           struct md_object *p, struct md_object *c,
1029                           struct md_attr *attr, int mask)
1030 {
1031         return -EREMOTE;
1032 }
1033
1034 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
1035                         struct md_attr *attr)
1036 {
1037         return -EREMOTE;
1038 }
1039
1040 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
1041                         const struct md_attr *attr)
1042 {
1043         return -EFAULT;
1044 }
1045
1046 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
1047                          struct lu_buf *buf, const char *name)
1048 {
1049         return -EFAULT;
1050 }
1051
1052 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
1053                         struct lu_buf *buf)
1054 {
1055         return -EFAULT;
1056 }
1057
1058 static int cmr_changelog(const struct lu_env *env, enum changelog_rec_type type,
1059                          int flags, struct md_object *mo)
1060 {
1061         return -EFAULT;
1062 }
1063
1064 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
1065                           struct lu_buf *buf)
1066 {
1067         return -EFAULT;
1068 }
1069
1070 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
1071                          const struct lu_buf *buf, const char *name,
1072                          int fl)
1073 {
1074         return -EFAULT;
1075 }
1076
1077 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
1078                          const char *name)
1079 {
1080         return -EFAULT;
1081 }
1082
1083 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
1084                        const struct md_attr *ma)
1085 {
1086         return -EFAULT;
1087 }
1088
1089 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
1090                        struct md_attr *ma)
1091 {
1092         return -EFAULT;
1093 }
1094
1095 static int cmr_open(const struct lu_env *env, struct md_object *mo,
1096                     int flags)
1097 {
1098         return -EREMOTE;
1099 }
1100
1101 static int cmr_close(const struct lu_env *env, struct md_object *mo,
1102                      struct md_attr *ma)
1103 {
1104         return -EFAULT;
1105 }
1106
1107 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
1108                         const struct lu_rdpg *rdpg)
1109 {
1110         return -EREMOTE;
1111 }
1112
1113 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
1114                         struct lustre_capa *capa, int renewal)
1115 {
1116         return -EFAULT;
1117 }
1118
1119 static int cmr_path(const struct lu_env *env, struct md_object *obj,
1120                     char *path, int pathlen, __u64 *recno, int *linkno)
1121 {
1122         return -EREMOTE;
1123 }
1124
1125 static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
1126 {
1127         return -EFAULT;
1128 }
1129
1130 static int cmr_file_lock(const struct lu_env *env, struct md_object *mo,
1131                          struct lov_mds_md *lmm, struct ldlm_extent *extent,
1132                          struct lustre_handle *lockh)
1133 {
1134         return -EREMOTE;
1135 }
1136
1137 static int cmr_file_unlock(const struct lu_env *env, struct md_object *mo,
1138                            struct lov_mds_md *lmm, struct lustre_handle *lockh)
1139 {
1140         return -EREMOTE;
1141 }
1142
1143 /**
1144  * cmr moo_version_get().
1145  */
1146 static dt_obj_version_t cmr_version_get(const struct lu_env *env,
1147                                         struct md_object *mo)
1148 {
1149         /** Don't check remote object version */
1150         return 0;
1151 }
1152
1153
1154 /**
1155  * cmr moo_version_set().
1156  * No need to update remote object version here, it is done as a part
1157  * of reintegration of partial operation on the remote server.
1158  */
1159 static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
1160                             dt_obj_version_t version)
1161 {
1162         return;
1163 }
1164
1165 /** Set of md_object_operations for cmr. */
1166 static const struct md_object_operations cmr_mo_ops = {
1167         .moo_permission    = cmr_permission,
1168         .moo_attr_get      = cmr_attr_get,
1169         .moo_attr_set      = cmr_attr_set,
1170         .moo_xattr_get     = cmr_xattr_get,
1171         .moo_xattr_set     = cmr_xattr_set,
1172         .moo_xattr_list    = cmr_xattr_list,
1173         .moo_xattr_del     = cmr_xattr_del,
1174         .moo_object_create = cmr_object_create,
1175         .moo_ref_add       = cmr_ref_add,
1176         .moo_ref_del       = cmr_ref_del,
1177         .moo_open          = cmr_open,
1178         .moo_close         = cmr_close,
1179         .moo_readpage      = cmr_readpage,
1180         .moo_readlink      = cmr_readlink,
1181         .moo_changelog     = cmr_changelog,
1182         .moo_capa_get      = cmr_capa_get,
1183         .moo_object_sync   = cmr_object_sync,
1184         .moo_version_get   = cmr_version_get,
1185         .moo_version_set   = cmr_version_set,
1186         .moo_path          = cmr_path,
1187         .moo_file_lock     = cmr_file_lock,
1188         .moo_file_unlock   = cmr_file_unlock,
1189 };
1190 /** @} */
1191
1192 /**
1193  * \name cmr md_dir operations.
1194  *
1195  * All methods below are cross-ref by nature. They consist of remote call and
1196  * local operation. Due to future rollback functionality there are several
1197  * limitations for such methods:
1198  * -# remote call should be done at first to do epoch negotiation between all
1199  * MDS involved and to avoid the RPC inside transaction.
1200  * -# only one RPC can be sent - also due to epoch negotiation.
1201  * For more details see rollback HLD/DLD.
1202  * @{
1203  */
1204 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
1205                       const struct lu_name *lname, struct lu_fid *lf,
1206                       struct md_op_spec *spec)
1207 {
1208         /*
1209          * This can happens while rename() If new parent is remote dir, lookup
1210          * will happen here.
1211          */
1212
1213         return -EREMOTE;
1214 }
1215
1216 /** Return lock mode. */
1217 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
1218                                 struct md_object *mo, mdl_mode_t lm)
1219 {
1220         return MDL_MINMODE;
1221 }
1222
1223 /**
1224  * Create operation for cmr.
1225  * Remote object creation and local name insert.
1226  *
1227  * \param mo_p Parent directory. Local object.
1228  * \param lchild_name name of file to create.
1229  * \param mo_c Child object. It has no real inode yet.
1230  * \param spec creation specification.
1231  * \param ma child object attributes.
1232  */
1233 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
1234                       const struct lu_name *lchild_name, struct md_object *mo_c,
1235                       struct md_op_spec *spec,
1236                       struct md_attr *ma)
1237 {
1238         struct cmm_thread_info *cmi;
1239         struct md_attr *tmp_ma;
1240         int rc;
1241         ENTRY;
1242
1243         /* Make sure that name isn't exist before doing remote call. */
1244         rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
1245                         &cmm_env_info(env)->cmi_fid, NULL);
1246         if (rc == 0)
1247                 RETURN(-EEXIST);
1248         else if (rc != -ENOENT)
1249                 RETURN(rc);
1250
1251         /* check the SGID attr */
1252         cmi = cmm_env_info(env);
1253         LASSERT(cmi);
1254         tmp_ma = &cmi->cmi_ma;
1255         tmp_ma->ma_valid = 0;
1256         tmp_ma->ma_need = MA_INODE;
1257
1258 #ifdef CONFIG_FS_POSIX_ACL
1259         if (!S_ISLNK(ma->ma_attr.la_mode)) {
1260                 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1261                 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1262                 tmp_ma->ma_need |= MA_ACL_DEF;
1263         }
1264 #endif
1265         rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1266         if (rc)
1267                 RETURN(rc);
1268
1269         if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1270                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1271                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1272                         ma->ma_attr.la_mode |= S_ISGID;
1273                         ma->ma_attr.la_valid |= LA_MODE;
1274                 }
1275         }
1276
1277 #ifdef CONFIG_FS_POSIX_ACL
1278         if (tmp_ma->ma_valid & MA_ACL_DEF) {
1279                 spec->u.sp_ea.fid = spec->u.sp_pfid;
1280                 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1281                 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1282                 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1283         }
1284 #endif
1285
1286         /* Local permission check for name_insert before remote ops. */
1287         rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1288                            (S_ISDIR(ma->ma_attr.la_mode) ?
1289                            MAY_LINK : MAY_CREATE));
1290         if (rc)
1291                 RETURN(rc);
1292
1293         /**
1294          * \note \a ma will be changed after mo_object_create(), but we will use
1295          * it for mdo_name_insert() later, so save it before mo_object_create().
1296          */
1297         *tmp_ma = *ma;
1298         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1299         if (rc == 0) {
1300                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1301                 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1302                                      lu_object_fid(&mo_c->mo_lu), tmp_ma);
1303                 if (unlikely(rc)) {
1304                         /* TODO: remove object mo_c on remote MDS */
1305                         CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1306                               " [name %s] [mo_c "DFID"] [err %d]\n",
1307                               PFID(lu_object_fid(&mo_p->mo_lu)),
1308                               lchild_name->ln_name,
1309                               PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1310                 }
1311         }
1312
1313         RETURN(rc);
1314 }
1315
1316 /**
1317  * Link operations for cmr.
1318  *
1319  * The link RPC is always issued to the server where source parent is living.
1320  * The first operation to do is object nlink increment on remote server.
1321  * Second one is local mdo_name_insert().
1322  *
1323  * \param mo_p parent directory. It is local.
1324  * \param mo_s source object to link. It is remote.
1325  * \param lname Name of link file.
1326  * \param ma object attributes.
1327  */
1328 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1329                     struct md_object *mo_s, const struct lu_name *lname,
1330                     struct md_attr *ma)
1331 {
1332         int rc;
1333         ENTRY;
1334
1335         /* Make sure that name isn't exist before doing remote call. */
1336         rc = mdo_lookup(env, md_object_next(mo_p), lname,
1337                         &cmm_env_info(env)->cmi_fid, NULL);
1338         if (rc == 0) {
1339                 rc = -EEXIST;
1340         } else if (rc == -ENOENT) {
1341                 /* Local permission check for name_insert before remote ops. */
1342                 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1343                                    MAY_CREATE);
1344                 if (rc)
1345                         RETURN(rc);
1346
1347                 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1348                 if (rc == 0) {
1349                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
1350                         rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1351                                              lu_object_fid(&mo_s->mo_lu), ma);
1352                         if (unlikely(rc)) {
1353                                 /* TODO: ref_del from mo_s on remote MDS */
1354                                 CWARN("cmr_link failed, should revoke: "
1355                                       "[mo_p "DFID"] [mo_s "DFID"] "
1356                                       "[name %s] [err %d]\n",
1357                                       PFID(lu_object_fid(&mo_p->mo_lu)),
1358                                       PFID(lu_object_fid(&mo_s->mo_lu)),
1359                                       lname->ln_name, rc);
1360                         }
1361                 }
1362         }
1363         RETURN(rc);
1364 }
1365
1366 /**
1367  * Unlink operations for cmr.
1368  *
1369  * The unlink RPC is always issued to the server where parent is living. Hence
1370  * the first operation to do is object unlink on remote server. Second one is
1371  * local mdo_name_remove().
1372  *
1373  * \param mo_p parent md_object. It is local.
1374  * \param mo_c child object to be unlinked. It is remote.
1375  * \param lname Name of file to unlink.
1376  * \param ma object attributes.
1377  */
1378 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1379                       struct md_object *mo_c, const struct lu_name *lname,
1380                       struct md_attr *ma)
1381 {
1382         struct cmm_thread_info *cmi;
1383         struct md_attr *tmp_ma;
1384         int rc;
1385         ENTRY;
1386
1387         /* Local permission check for name_remove before remote ops. */
1388         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1389                            MAY_UNLINK | MAY_VTX_PART);
1390         if (rc)
1391                 RETURN(rc);
1392
1393         /*
1394          * \note \a ma will be changed after mo_ref_del, but we will use
1395          * it for mdo_name_remove() later, so save it before mo_ref_del().
1396          */
1397         cmi = cmm_env_info(env);
1398         tmp_ma = &cmi->cmi_ma;
1399         *tmp_ma = *ma;
1400         rc = mo_ref_del(env, md_object_next(mo_c), ma);
1401         if (rc == 0) {
1402                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1403                 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1404                 if (unlikely(rc)) {
1405                         /* TODO: ref_add to mo_c on remote MDS */
1406                         CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1407                               " [mo_c "DFID"] [name %s] [err %d]\n",
1408                               PFID(lu_object_fid(&mo_p->mo_lu)),
1409                               PFID(lu_object_fid(&mo_c->mo_lu)),
1410                               lname->ln_name, rc);
1411                 }
1412         }
1413
1414         RETURN(rc);
1415 }
1416
1417 /** Helper which outputs error message during cmr_rename() */
1418 static inline void cmr_rename_warn(const char *fname,
1419                                   struct md_object *mo_po,
1420                                   struct md_object *mo_pn,
1421                                   const struct lu_fid *lf,
1422                                   const char *s_name,
1423                                   const char *t_name,
1424                                   int err)
1425 {
1426         CWARN("cmr_rename failed for %s, should revoke: "
1427               "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1428               "[sname %s] [tname %s] [err %d]\n", fname,
1429               PFID(lu_object_fid(&mo_po->mo_lu)),
1430               PFID(lu_object_fid(&mo_pn->mo_lu)),
1431               PFID(lf), s_name, t_name, err);
1432 }
1433
1434 /**
1435  * Rename operation for cmr.
1436  *
1437  * This is the most complex cross-reference operation. It may consist of up to 4
1438  * MDS server and require several RPCs to be sent.
1439  *
1440  * \param mo_po Old parent object.
1441  * \param mo_pn New parent object.
1442  * \param lf FID of object to rename.
1443  * \param ls_name Source file name.
1444  * \param mo_t target object. Should be NULL here.
1445  * \param lt_name Name of target file.
1446  * \param ma object attributes.
1447  */
1448 static int cmr_rename(const struct lu_env *env,
1449                       struct md_object *mo_po, struct md_object *mo_pn,
1450                       const struct lu_fid *lf, const struct lu_name *ls_name,
1451                       struct md_object *mo_t, const struct lu_name *lt_name,
1452                       struct md_attr *ma)
1453 {
1454         struct cmm_thread_info *cmi;
1455         struct md_attr *tmp_ma;
1456         int rc;
1457         ENTRY;
1458
1459         LASSERT(mo_t == NULL);
1460
1461         /* get real type of src */
1462         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1463         if (rc)
1464                 RETURN(rc);
1465
1466         /* Local permission check for name_remove before remote ops. */
1467         rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1468                            MAY_UNLINK | MAY_VTX_FULL);
1469         if (rc)
1470                 RETURN(rc);
1471
1472         /**
1473          * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it
1474          * for mdo_name_remove() later, so save it before mdo_rename_tgt.
1475          */
1476         cmi = cmm_env_info(env);
1477         tmp_ma = &cmi->cmi_ma;
1478         *tmp_ma = *ma;
1479         /**
1480          * \note The \a mo_pn is remote directory, so we cannot even know if there is
1481          * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do
1482          * lookup and process this further.
1483          */
1484         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1485                             NULL/* mo_t */, lf, lt_name, ma);
1486         if (rc)
1487                 RETURN(rc);
1488
1489         tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1490
1491         /* src object maybe on remote MDS, do remote ops first. */
1492         rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1493         if (unlikely(rc)) {
1494                 /* TODO: revoke mdo_rename_tgt */
1495                 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1496                                 ls_name->ln_name, lt_name->ln_name, rc);
1497                 RETURN(rc);
1498         }
1499
1500         /* only old name is removed localy */
1501         rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1502         if (unlikely(rc))
1503                 /* TODO: revoke all cmr_rename */
1504                 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1505                                 ls_name->ln_name, lt_name->ln_name, rc);
1506
1507         RETURN(rc);
1508 }
1509
1510 /**
1511  * Part of cross-ref rename().
1512  * Used to insert new name in new parent and unlink target.
1513  */
1514 static int cmr_rename_tgt(const struct lu_env *env,
1515                           struct md_object *mo_p, struct md_object *mo_t,
1516                           const struct lu_fid *lf, const struct lu_name *lname,
1517                           struct md_attr *ma)
1518 {
1519         struct cmm_thread_info *cmi;
1520         struct md_attr *tmp_ma;
1521         int rc;
1522         ENTRY;
1523
1524         /* target object is remote one */
1525         /* Local permission check for rename_tgt before remote ops. */
1526         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1527                            MAY_UNLINK | MAY_VTX_PART);
1528         if (rc)
1529                 RETURN(rc);
1530
1531         /*
1532          * XXX: @ma maybe changed after mo_ref_del, but we will use
1533          * it for mdo_rename_tgt later, so save it before mo_ref_del.
1534          */
1535         cmi = cmm_env_info(env);
1536         tmp_ma = &cmi->cmi_ma;
1537         *tmp_ma = *ma;
1538         rc = mo_ref_del(env, md_object_next(mo_t), ma);
1539         /* continue locally with name handling only */
1540         if (rc == 0) {
1541                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1542                 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1543                                     NULL, lf, lname, tmp_ma);
1544                 if (unlikely(rc)) {
1545                         /* TODO: ref_add to mo_t on remote MDS */
1546                         CWARN("cmr_rename_tgt failed, should revoke: "
1547                               "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1548                               "[name %s] [err %d]\n",
1549                               PFID(lu_object_fid(&mo_p->mo_lu)),
1550                               PFID(lu_object_fid(&mo_t->mo_lu)),
1551                               PFID(lf),
1552                               lname->ln_name, rc);
1553                 }
1554         }
1555         RETURN(rc);
1556 }
1557 /** @} */
1558 /**
1559  * The md_dir_operations for cmr.
1560  */
1561 static const struct md_dir_operations cmr_dir_ops = {
1562         .mdo_is_subdir   = cmm_is_subdir,
1563         .mdo_lookup      = cmr_lookup,
1564         .mdo_lock_mode   = cmr_lock_mode,
1565         .mdo_create      = cmr_create,
1566         .mdo_link        = cmr_link,
1567         .mdo_unlink      = cmr_unlink,
1568         .mdo_rename      = cmr_rename,
1569         .mdo_rename_tgt  = cmr_rename_tgt
1570 };
1571 /** @} */