Whamcloud - gitweb
24a07d69fa77f1937849aaf1cafabc8c1557fc89
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2012, Intel, Inc.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37 #define DEBUG_SUBSYSTEM S_MDS
38
39 #include <obd.h>
40 #include <obd_class.h>
41 #include <lustre_ver.h>
42 #include <obd_support.h>
43 #include <lprocfs_status.h>
44
45 #include <lustre_fid.h>
46 #include <lustre_param.h>
47 #include <lustre_fid.h>
48 #include <obd_lov.h>
49
50 #include "lod_internal.h"
51
52 extern cfs_mem_cache_t *lod_object_kmem;
53 static const struct dt_body_operations lod_body_lnk_ops;
54
55 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
56                             struct dt_rec *rec, const struct dt_key *key,
57                             struct lustre_capa *capa)
58 {
59         struct dt_object *next = dt_object_child(dt);
60         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
61 }
62
63 static int lod_declare_index_insert(const struct lu_env *env,
64                                     struct dt_object *dt,
65                                     const struct dt_rec *rec,
66                                     const struct dt_key *key,
67                                     struct thandle *handle)
68 {
69         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
70 }
71
72 static int lod_index_insert(const struct lu_env *env,
73                             struct dt_object *dt,
74                             const struct dt_rec *rec,
75                             const struct dt_key *key,
76                             struct thandle *th,
77                             struct lustre_capa *capa,
78                             int ign)
79 {
80         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
81 }
82
83 static int lod_declare_index_delete(const struct lu_env *env,
84                                     struct dt_object *dt,
85                                     const struct dt_key *key,
86                                     struct thandle *th)
87 {
88         return dt_declare_delete(env, dt_object_child(dt), key, th);
89 }
90
91 static int lod_index_delete(const struct lu_env *env,
92                             struct dt_object *dt,
93                             const struct dt_key *key,
94                             struct thandle *th,
95                             struct lustre_capa *capa)
96 {
97         return dt_delete(env, dt_object_child(dt), key, th, capa);
98 }
99
100 static struct dt_it *lod_it_init(const struct lu_env *env,
101                                  struct dt_object *dt, __u32 attr,
102                                  struct lustre_capa *capa)
103 {
104         struct dt_object   *next = dt_object_child(dt);
105
106         return next->do_index_ops->dio_it.init(env, next, attr, capa);
107 }
108
109 static struct dt_index_operations lod_index_ops = {
110         .dio_lookup         = lod_index_lookup,
111         .dio_declare_insert = lod_declare_index_insert,
112         .dio_insert         = lod_index_insert,
113         .dio_declare_delete = lod_declare_index_delete,
114         .dio_delete         = lod_index_delete,
115         .dio_it     = {
116                 .init       = lod_it_init,
117         }
118 };
119
120 static void lod_object_read_lock(const struct lu_env *env,
121                                  struct dt_object *dt, unsigned role)
122 {
123         dt_read_lock(env, dt_object_child(dt), role);
124 }
125
126 static void lod_object_write_lock(const struct lu_env *env,
127                                   struct dt_object *dt, unsigned role)
128 {
129         dt_write_lock(env, dt_object_child(dt), role);
130 }
131
132 static void lod_object_read_unlock(const struct lu_env *env,
133                                    struct dt_object *dt)
134 {
135         dt_read_unlock(env, dt_object_child(dt));
136 }
137
138 static void lod_object_write_unlock(const struct lu_env *env,
139                                     struct dt_object *dt)
140 {
141         dt_write_unlock(env, dt_object_child(dt));
142 }
143
144 static int lod_object_write_locked(const struct lu_env *env,
145                                    struct dt_object *dt)
146 {
147         return dt_write_locked(env, dt_object_child(dt));
148 }
149
150 static int lod_attr_get(const struct lu_env *env,
151                         struct dt_object *dt,
152                         struct lu_attr *attr,
153                         struct lustre_capa *capa)
154 {
155         return dt_attr_get(env, dt_object_child(dt), attr, capa);
156 }
157
158 static int lod_declare_attr_set(const struct lu_env *env,
159                                 struct dt_object *dt,
160                                 const struct lu_attr *attr,
161                                 struct thandle *handle)
162 {
163         struct dt_object  *next = dt_object_child(dt);
164         struct lod_object *lo = lod_dt_obj(dt);
165         int                rc, i;
166         ENTRY;
167
168         /*
169          * declare setattr on the local object
170          */
171         rc = dt_declare_attr_set(env, next, attr, handle);
172         if (rc)
173                 RETURN(rc);
174
175         /*
176          * load striping information, notice we don't do this when object
177          * is being initialized as we don't need this information till
178          * few specific cases like destroy, chown
179          */
180         rc = lod_load_striping(env, lo);
181         if (rc)
182                 RETURN(rc);
183
184         /*
185          * if object is striped declare changes on the stripes
186          */
187         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
188         for (i = 0; i < lo->ldo_stripenr; i++) {
189                 LASSERT(lo->ldo_stripe[i]);
190                 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
191                 if (rc) {
192                         CERROR("failed declaration: %d\n", rc);
193                         break;
194                 }
195         }
196
197         RETURN(rc);
198 }
199
200 static int lod_attr_set(const struct lu_env *env,
201                         struct dt_object *dt,
202                         const struct lu_attr *attr,
203                         struct thandle *handle,
204                         struct lustre_capa *capa)
205 {
206         struct dt_object  *next = dt_object_child(dt);
207         struct lod_object *lo = lod_dt_obj(dt);
208         int                rc, i;
209         ENTRY;
210
211         /*
212          * apply changes to the local object
213          */
214         rc = dt_attr_set(env, next, attr, handle, capa);
215         if (rc)
216                 RETURN(rc);
217
218         /*
219          * if object is striped, apply changes to all the stripes
220          */
221         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
222         for (i = 0; i < lo->ldo_stripenr; i++) {
223                 LASSERT(lo->ldo_stripe[i]);
224                 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
225                 if (rc) {
226                         CERROR("failed declaration: %d\n", rc);
227                         break;
228                 }
229         }
230
231         RETURN(rc);
232 }
233
234 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
235                          struct lu_buf *buf, const char *name,
236                          struct lustre_capa *capa)
237 {
238         struct lod_thread_info  *info = lod_env_info(env);
239         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
240         int                      rc, is_root;
241         ENTRY;
242
243         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
244         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
245                 RETURN(rc);
246
247         /*
248          * lod returns default striping on the real root of the device
249          * this is like the root stores default striping for the whole
250          * filesystem. historically we've been using a different approach
251          * and store it in the config.
252          */
253         dt_root_get(env, dev->lod_child, &info->lti_fid);
254         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
255
256         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
257                 struct lov_user_md *lum = buf->lb_buf;
258                 struct lov_desc    *desc = &dev->lod_desc;
259
260                 if (buf->lb_buf == NULL) {
261                         rc = sizeof(struct lov_user_md_v1);
262                 } else if (buf->lb_len >= sizeof(struct lov_user_md_v1)) {
263                         lum->lmm_magic = LOV_USER_MAGIC_V1;
264                         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
265                         lum->lmm_pattern = desc->ld_pattern;
266                         lum->lmm_stripe_size = desc->ld_default_stripe_size;
267                         lum->lmm_stripe_count = desc->ld_default_stripe_count;
268                         lum->lmm_stripe_offset = desc->ld_default_stripe_offset;
269                         rc = sizeof(struct lov_user_md_v1);
270                 } else {
271                         rc = -ERANGE;
272                 }
273         }
274
275         RETURN(rc);
276 }
277
278 /*
279  * LOV xattr is a storage for striping, and LOD owns this xattr.
280  * but LOD allows others to control striping to some extent
281  * - to reset strping
282  * - to set new defined striping
283  * - to set new semi-defined striping
284  *   - number of stripes is defined
285  *   - number of stripes + osts are defined
286  *   - ??
287  */
288 static int lod_declare_xattr_set(const struct lu_env *env,
289                                  struct dt_object *dt,
290                                  const struct lu_buf *buf,
291                                  const char *name, int fl,
292                                  struct thandle *th)
293 {
294         struct dt_object *next = dt_object_child(dt);
295         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
296         __u32             mode;
297         int               rc;
298         ENTRY;
299
300         /*
301          * allow to declare predefined striping on a new (!mode) object
302          * which is supposed to be replay of regular file creation
303          * (when LOV setting is declared)
304          */
305         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
306         if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV)) {
307                 /*
308                  * this is a request to manipulate object's striping
309                  */
310                 if (dt_object_exists(dt)) {
311                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
312                         if (rc)
313                                 RETURN(rc);
314                 } else {
315                         memset(attr, 0, sizeof(attr));
316                         attr->la_valid = LA_TYPE | LA_MODE;
317                         attr->la_mode = S_IFREG;
318                 }
319                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
320                 if (rc)
321                         RETURN(rc);
322         }
323
324         rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
325
326         RETURN(rc);
327 }
328
329 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
330                                     struct dt_object *dt,
331                                     const struct lu_buf *buf,
332                                     const char *name, int fl,
333                                     struct thandle *th,
334                                     struct lustre_capa *capa)
335 {
336         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
337         struct dt_object        *next = dt_object_child(dt);
338         struct lod_object       *l = lod_dt_obj(dt);
339         struct lov_user_md_v1   *lum;
340         struct lov_user_md_v3   *v3 = NULL;
341         int                      rc;
342         ENTRY;
343
344         LASSERT(l->ldo_stripe == NULL);
345         l->ldo_striping_cached = 0;
346         l->ldo_def_striping_set = 0;
347         lod_object_set_pool(l, NULL);
348         l->ldo_def_stripe_size = 0;
349         l->ldo_def_stripenr = 0;
350
351         LASSERT(buf);
352         LASSERT(buf->lb_buf);
353         lum = buf->lb_buf;
354
355         rc = lod_verify_striping(d, buf, 0);
356         if (rc)
357                 RETURN(rc);
358
359         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
360                 v3 = buf->lb_buf;
361
362         /* if { size, offset, count } = { 0, -1, 0 } and no pool
363          * (i.e. all default values specified) then delete default
364          * striping from dir. */
365         CDEBUG(D_OTHER,
366                 "set default striping: sz %u # %u offset %d %s %s\n",
367                 (unsigned)lum->lmm_stripe_size,
368                 (unsigned)lum->lmm_stripe_count,
369                 (int)lum->lmm_stripe_offset,
370                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
371
372         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
373                                 (lum->lmm_stripe_count),
374                                 (lum->lmm_stripe_offset)) &&
375                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
376                 rc = dt_xattr_del(env, next, name, th, capa);
377                 if (rc == -ENODATA)
378                         rc = 0;
379         } else {
380                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
381         }
382
383         RETURN(rc);
384 }
385
386 static int lod_xattr_set(const struct lu_env *env,
387                          struct dt_object *dt, const struct lu_buf *buf,
388                          const char *name, int fl, struct thandle *th,
389                          struct lustre_capa *capa)
390 {
391         struct dt_object *next = dt_object_child(dt);
392         __u32             attr;
393         int               rc;
394         ENTRY;
395
396         attr = dt->do_lu.lo_header->loh_attr & S_IFMT;
397         if (S_ISDIR(attr)) {
398                 if (strncmp(name, XATTR_NAME_LOV, strlen(XATTR_NAME_LOV)) == 0)
399                         rc = lod_xattr_set_lov_on_dir(env, dt, buf, name,
400                                                       fl, th, capa);
401                 else
402                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
403
404         } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) {
405                 /*
406                  * XXX: check striping match what we already have
407                  * during req replay, declare_xattr_set() defines striping,
408                  * then create() does the work
409                  */
410                 rc = lod_striping_create(env, dt, NULL, NULL, th);
411                 RETURN(rc);
412         } else {
413                 /*
414                  * behave transparantly for all other EAs
415                  */
416                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
417         }
418
419         RETURN(rc);
420 }
421
422 static int lod_declare_xattr_del(const struct lu_env *env,
423                                  struct dt_object *dt, const char *name,
424                                  struct thandle *th)
425 {
426         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
427 }
428
429 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
430                          const char *name, struct thandle *th,
431                          struct lustre_capa *capa)
432 {
433         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
434 }
435
436 static int lod_xattr_list(const struct lu_env *env,
437                           struct dt_object *dt, struct lu_buf *buf,
438                           struct lustre_capa *capa)
439 {
440         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
441 }
442
443 int lod_object_set_pool(struct lod_object *o, char *pool)
444 {
445         int len;
446
447         if (o->ldo_pool) {
448                 len = strlen(o->ldo_pool);
449                 OBD_FREE(o->ldo_pool, len + 1);
450                 o->ldo_pool = NULL;
451         }
452         if (pool) {
453                 len = strlen(pool);
454                 OBD_ALLOC(o->ldo_pool, len + 1);
455                 if (o->ldo_pool == NULL)
456                         return -ENOMEM;
457                 strcpy(o->ldo_pool, pool);
458         }
459         return 0;
460 }
461
462 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
463 {
464         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
465 }
466
467 static int lod_cache_parent_striping(const struct lu_env *env,
468                                      struct lod_object *lp)
469 {
470         struct lov_user_md_v1   *v1 = NULL;
471         struct lov_user_md_v3   *v3 = NULL;
472         int                      rc;
473         ENTRY;
474
475         /* dt_ah_init() is called from MDD without parent being write locked
476          * lock it here */
477         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
478         if (lp->ldo_striping_cached)
479                 GOTO(unlock, rc = 0);
480
481         rc = lod_get_lov_ea(env, lp);
482         if (rc < 0)
483                 GOTO(unlock, rc);
484
485         if (rc < sizeof(struct lov_user_md)) {
486                 /* don't lookup for non-existing or invalid striping */
487                 lp->ldo_def_striping_set = 0;
488                 lp->ldo_striping_cached = 1;
489                 lp->ldo_def_stripe_size = 0;
490                 lp->ldo_def_stripenr = 0;
491                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
492                 GOTO(unlock, rc = 0);
493         }
494
495         v1 = (struct lov_user_md_v1 *)lod_env_info(env)->lti_ea_store;
496         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
497                 lustre_swab_lov_user_md_v1(v1);
498         else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
499                 lustre_swab_lov_user_md_v3(v3);
500
501         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
502                 GOTO(unlock, rc = 0);
503
504         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
505                 GOTO(unlock, rc = 0);
506
507         lp->ldo_def_stripenr = v1->lmm_stripe_count;
508         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
509         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
510         lp->ldo_striping_cached = 1;
511         lp->ldo_def_striping_set = 1;
512
513         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
514                 /* XXX: sanity check here */
515                 v3 = (struct lov_user_md_v3 *) v1;
516                 if (v3->lmm_pool_name[0])
517                         lod_object_set_pool(lp, v3->lmm_pool_name);
518         }
519
520         CDEBUG(D_OTHER, "def. striping: # %d, sz %d, off %d %s%s on "DFID"\n",
521                lp->ldo_def_stripenr, lp->ldo_def_stripe_size,
522                lp->ldo_def_stripe_offset, v3 ? "from " : "",
523                v3 ? lp->ldo_pool : "", PFID(lu_object_fid(&lp->ldo_obj.do_lu)));
524
525         EXIT;
526 unlock:
527         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
528         return rc;
529 }
530
531 /**
532  * used to transfer default striping data to the object being created
533  */
534 static void lod_ah_init(const struct lu_env *env,
535                         struct dt_allocation_hint *ah,
536                         struct dt_object *parent,
537                         struct dt_object *child,
538                         cfs_umode_t child_mode)
539 {
540         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
541         struct dt_object  *nextp = NULL;
542         struct dt_object  *nextc;
543         struct lod_object *lp = NULL;
544         struct lod_object *lc;
545         struct lov_desc   *desc;
546         ENTRY;
547
548         LASSERT(child);
549
550         if (likely(parent)) {
551                 nextp = dt_object_child(parent);
552                 lp = lod_dt_obj(parent);
553         }
554
555         nextc = dt_object_child(child);
556         lc = lod_dt_obj(child);
557
558         LASSERT(lc->ldo_stripenr == 0);
559         LASSERT(lc->ldo_stripe == NULL);
560
561         /*
562          * local object may want some hints
563          * in case of late striping creation, ->ah_init()
564          * can be called with local object existing
565          */
566         if (!dt_object_exists(nextc))
567                 nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
568
569         if (S_ISDIR(child_mode)) {
570                 if (lp->ldo_striping_cached == 0) {
571                         /* we haven't tried to get default striping for
572                          * the directory yet, let's cache it in the object */
573                         lod_cache_parent_striping(env, lp);
574                 }
575                 /* transfer defaults to new directory */
576                 if (lp->ldo_striping_cached) {
577                         if (lp->ldo_pool)
578                                 lod_object_set_pool(lc, lp->ldo_pool);
579                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
580                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
581                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
582                         lc->ldo_striping_cached = 1;
583                         lc->ldo_def_striping_set = 1;
584                         CDEBUG(D_OTHER, "inherite striping defaults\n");
585                 }
586                 return;
587         }
588
589         /*
590          * if object is going to be striped over OSTs, transfer default
591          * striping information to the child, so that we can use it
592          * during declaration and creation
593          */
594         if (!lod_object_will_be_striped(S_ISREG(child_mode),
595                                         lu_object_fid(&child->do_lu)))
596                 return;
597
598         /*
599          * try from the parent
600          */
601         if (likely(parent)) {
602                 if (lp->ldo_striping_cached == 0) {
603                         /* we haven't tried to get default striping for
604                          * the directory yet, let's cache it in the object */
605                         lod_cache_parent_striping(env, lp);
606                 }
607
608                 lc->ldo_def_stripe_offset = (__u16) -1;
609
610                 if (lp->ldo_def_striping_set) {
611                         if (lp->ldo_pool)
612                                 lod_object_set_pool(lc, lp->ldo_pool);
613                         lc->ldo_stripenr = lp->ldo_def_stripenr;
614                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
615                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
616                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
617                                lc->ldo_stripenr, lc->ldo_stripe_size,
618                                lp->ldo_pool ? lp->ldo_pool : "");
619                 }
620         }
621
622         /*
623          * if the parent doesn't provide with specific pattern, grab fs-wide one
624          */
625         desc = &d->lod_desc;
626         if (lc->ldo_stripenr == 0)
627                 lc->ldo_stripenr = desc->ld_default_stripe_count;
628         if (lc->ldo_stripe_size == 0)
629                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
630         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
631                lc->ldo_stripenr, lc->ldo_stripe_size,
632                lc->ldo_pool ? lc->ldo_pool : "");
633
634         EXIT;
635 }
636
637 /**
638  * Create declaration of striped object
639  */
640 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
641                                struct lu_attr *attr,
642                                const struct lu_buf *lovea, struct thandle *th)
643 {
644         struct lod_thread_info  *info = lod_env_info(env);
645         struct dt_object        *next = dt_object_child(dt);
646         struct lod_object       *lo = lod_dt_obj(dt);
647         int                      rc;
648         ENTRY;
649
650         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
651                 /* failed to create striping, let's reset
652                  * config so that others don't get confused */
653                 lod_object_free_striping(env, lo);
654                 GOTO(out, rc = -ENOMEM);
655         }
656
657         /* choose OST and generate appropriate objects */
658         rc = lod_qos_prep_create(env, lo, attr, lovea, th);
659         if (rc) {
660                 /* failed to create striping, let's reset
661                  * config so that others don't get confused */
662                 lod_object_free_striping(env, lo);
663                 GOTO(out, rc);
664         }
665
666         /*
667          * declare storage for striping data
668          */
669         info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
670                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
671         rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV,
672                                   0, th);
673         if (rc)
674                 GOTO(out, rc);
675
676 out:
677         RETURN(rc);
678 }
679
680 static int lod_declare_object_create(const struct lu_env *env,
681                                      struct dt_object *dt,
682                                      struct lu_attr *attr,
683                                      struct dt_allocation_hint *hint,
684                                      struct dt_object_format *dof,
685                                      struct thandle *th)
686 {
687         struct dt_object   *next = dt_object_child(dt);
688         struct lod_object  *lo = lod_dt_obj(dt);
689         int                 rc;
690         ENTRY;
691
692         LASSERT(dof);
693         LASSERT(attr);
694         LASSERT(th);
695         LASSERT(!dt_object_exists(next));
696
697         /*
698          * first of all, we declare creation of local object
699          */
700         rc = dt_declare_create(env, next, attr, hint, dof, th);
701         if (rc)
702                 GOTO(out, rc);
703
704         if (dof->dof_type == DFT_SYM)
705                 dt->do_body_ops = &lod_body_lnk_ops;
706
707         /*
708          * it's lod_ah_init() who has decided the object will striped
709          */
710         if (dof->dof_type == DFT_REGULAR) {
711                 /* callers don't want stripes */
712                 /* XXX: all tricky interactions with ->ah_make_hint() decided
713                  * to use striping, then ->declare_create() behaving differently
714                  * should be cleaned */
715                 if (dof->u.dof_reg.striped == 0)
716                         lo->ldo_stripenr = 0;
717                 if (lo->ldo_stripenr > 0)
718                         rc = lod_declare_striped_object(env, dt, attr,
719                                                         NULL, th);
720         } else if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) {
721                 struct lod_thread_info *info = lod_env_info(env);
722
723                 info->lti_buf.lb_buf = NULL;
724                 info->lti_buf.lb_len = sizeof(struct lov_user_md_v3);
725                 /* to transfer default striping from the parent */
726                 rc = dt_declare_xattr_set(env, next, &info->lti_buf,
727                                           XATTR_NAME_LOV, 0, th);
728         }
729
730 out:
731         RETURN(rc);
732 }
733
734 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
735                         struct lu_attr *attr, struct dt_object_format *dof,
736                         struct thandle *th)
737 {
738         struct lod_object *lo = lod_dt_obj(dt);
739         int                rc = 0, i;
740         ENTRY;
741
742         LASSERT(lo->ldo_stripe);
743         LASSERT(lo->ldo_stripe > 0);
744         LASSERT(lo->ldo_striping_cached == 0);
745
746         /* create all underlying objects */
747         for (i = 0; i < lo->ldo_stripenr; i++) {
748                 LASSERT(lo->ldo_stripe[i]);
749                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
750
751                 if (rc)
752                         break;
753         }
754         if (rc == 0)
755                 rc = lod_generate_and_set_lovea(env, lo, th);
756
757         RETURN(rc);
758 }
759
760 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
761                              struct lu_attr *attr,
762                              struct dt_allocation_hint *hint,
763                              struct dt_object_format *dof, struct thandle *th)
764 {
765         struct dt_object   *next = dt_object_child(dt);
766         struct lod_object  *lo = lod_dt_obj(dt);
767         int                 rc;
768         ENTRY;
769
770         /* create local object */
771         rc = dt_create(env, next, attr, hint, dof, th);
772
773         if (rc == 0) {
774                 if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
775                         rc = lod_store_def_striping(env, dt, th);
776                 else if (lo->ldo_stripe)
777                         rc = lod_striping_create(env, dt, attr, dof, th);
778         }
779
780         RETURN(rc);
781 }
782
783 static int lod_declare_object_destroy(const struct lu_env *env,
784                                       struct dt_object *dt,
785                                       struct thandle *th)
786 {
787         struct dt_object   *next = dt_object_child(dt);
788         struct lod_object  *lo = lod_dt_obj(dt);
789         int                 rc, i;
790         ENTRY;
791
792         /*
793          * we declare destroy for the local object
794          */
795         rc = dt_declare_destroy(env, next, th);
796         if (rc)
797                 RETURN(rc);
798
799         /*
800          * load striping information, notice we don't do this when object
801          * is being initialized as we don't need this information till
802          * few specific cases like destroy, chown
803          */
804         rc = lod_load_striping(env, lo);
805         if (rc)
806                 RETURN(rc);
807
808         /* declare destroy for all underlying objects */
809         for (i = 0; i < lo->ldo_stripenr; i++) {
810                 LASSERT(lo->ldo_stripe[i]);
811                 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
812
813                 if (rc)
814                         break;
815         }
816
817         RETURN(rc);
818 }
819
820 static int lod_object_destroy(const struct lu_env *env,
821                 struct dt_object *dt, struct thandle *th)
822 {
823         struct dt_object  *next = dt_object_child(dt);
824         struct lod_object *lo = lod_dt_obj(dt);
825         int                rc, i;
826         ENTRY;
827
828         /* destroy local object */
829         rc = dt_destroy(env, next, th);
830         if (rc)
831                 RETURN(rc);
832
833         /* destroy all underlying objects */
834         for (i = 0; i < lo->ldo_stripenr; i++) {
835                 LASSERT(lo->ldo_stripe[i]);
836                 rc = dt_destroy(env, lo->ldo_stripe[i], th);
837                 if (rc)
838                         break;
839         }
840
841         RETURN(rc);
842 }
843
844 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
845                          const struct dt_index_features *feat)
846 {
847         struct dt_object *next = dt_object_child(dt);
848         int               rc;
849         ENTRY;
850
851         LASSERT(next->do_ops);
852         LASSERT(next->do_ops->do_index_try);
853
854         rc = next->do_ops->do_index_try(env, next, feat);
855         if (next->do_index_ops && dt->do_index_ops == NULL) {
856                 dt->do_index_ops = &lod_index_ops;
857                 /* XXX: iterators don't accept device, so bypass LOD */
858                 /* will be fixed with DNE */
859                 if (lod_index_ops.dio_it.fini == NULL) {
860                         lod_index_ops.dio_it = next->do_index_ops->dio_it;
861                         lod_index_ops.dio_it.init = lod_it_init;
862                 }
863         }
864
865         RETURN(rc);
866 }
867
868 static int lod_declare_ref_add(const struct lu_env *env,
869                                struct dt_object *dt, struct thandle *th)
870 {
871         return dt_declare_ref_add(env, dt_object_child(dt), th);
872 }
873
874 static int lod_ref_add(const struct lu_env *env,
875                        struct dt_object *dt, struct thandle *th)
876 {
877         return dt_ref_add(env, dt_object_child(dt), th);
878 }
879
880 static int lod_declare_ref_del(const struct lu_env *env,
881                                struct dt_object *dt, struct thandle *th)
882 {
883         return dt_declare_ref_del(env, dt_object_child(dt), th);
884 }
885
886 static int lod_ref_del(const struct lu_env *env,
887                        struct dt_object *dt, struct thandle *th)
888 {
889         return dt_ref_del(env, dt_object_child(dt), th);
890 }
891
892 static struct obd_capa *lod_capa_get(const struct lu_env *env,
893                                      struct dt_object *dt,
894                                      struct lustre_capa *old, __u64 opc)
895 {
896         return dt_capa_get(env, dt_object_child(dt), old, opc);
897 }
898
899 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt)
900 {
901         return dt_object_sync(env, dt_object_child(dt));
902 }
903
904 struct dt_object_operations lod_obj_ops = {
905         .do_read_lock           = lod_object_read_lock,
906         .do_write_lock          = lod_object_write_lock,
907         .do_read_unlock         = lod_object_read_unlock,
908         .do_write_unlock        = lod_object_write_unlock,
909         .do_write_locked        = lod_object_write_locked,
910         .do_attr_get            = lod_attr_get,
911         .do_declare_attr_set    = lod_declare_attr_set,
912         .do_attr_set            = lod_attr_set,
913         .do_xattr_get           = lod_xattr_get,
914         .do_declare_xattr_set   = lod_declare_xattr_set,
915         .do_xattr_set           = lod_xattr_set,
916         .do_declare_xattr_del   = lod_declare_xattr_del,
917         .do_xattr_del           = lod_xattr_del,
918         .do_xattr_list          = lod_xattr_list,
919         .do_ah_init             = lod_ah_init,
920         .do_declare_create      = lod_declare_object_create,
921         .do_create              = lod_object_create,
922         .do_declare_destroy     = lod_declare_object_destroy,
923         .do_destroy             = lod_object_destroy,
924         .do_index_try           = lod_index_try,
925         .do_declare_ref_add     = lod_declare_ref_add,
926         .do_ref_add             = lod_ref_add,
927         .do_declare_ref_del     = lod_declare_ref_del,
928         .do_ref_del             = lod_ref_del,
929         .do_capa_get            = lod_capa_get,
930         .do_object_sync         = lod_object_sync,
931 };
932
933 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
934                         struct lu_buf *buf, loff_t *pos,
935                         struct lustre_capa *capa)
936 {
937         struct dt_object *next = dt_object_child(dt);
938         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
939 }
940
941 static ssize_t lod_declare_write(const struct lu_env *env,
942                                  struct dt_object *dt,
943                                  const loff_t size, loff_t pos,
944                                  struct thandle *th)
945 {
946         return dt_declare_record_write(env, dt_object_child(dt),
947                                        size, pos, th);
948 }
949
950 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
951                          const struct lu_buf *buf, loff_t *pos,
952                          struct thandle *th, struct lustre_capa *capa, int iq)
953 {
954         struct dt_object *next = dt_object_child(dt);
955         LASSERT(next);
956         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
957 }
958
959 static const struct dt_body_operations lod_body_lnk_ops = {
960         .dbo_read               = lod_read,
961         .dbo_declare_write      = lod_declare_write,
962         .dbo_write              = lod_write
963 };
964
965 static int lod_object_init(const struct lu_env *env, struct lu_object *o,
966                            const struct lu_object_conf *conf)
967 {
968         struct lod_device *d = lu2lod_dev(o->lo_dev);
969         struct lu_object  *below;
970         struct lu_device  *under;
971         ENTRY;
972
973         /*
974          * create local object
975          */
976         under = &d->lod_child->dd_lu_dev;
977         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
978         if (below == NULL)
979                 RETURN(-ENOMEM);
980
981         lu_object_add(o, below);
982
983         RETURN(0);
984 }
985
986 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
987 {
988         int i;
989
990         if (lo->ldo_stripe) {
991                 LASSERT(lo->ldo_stripes_allocated > 0);
992
993                 for (i = 0; i < lo->ldo_stripenr; i++) {
994                         if (lo->ldo_stripe[i])
995                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
996                 }
997
998                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
999                 OBD_FREE(lo->ldo_stripe, i);
1000                 lo->ldo_stripe = NULL;
1001                 lo->ldo_stripes_allocated = 0;
1002         }
1003         lo->ldo_stripenr = 0;
1004 }
1005
1006 /*
1007  * ->start is called once all slices are initialized, including header's
1008  * cache for mode (object type). using the type we can initialize ops
1009  */
1010 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
1011 {
1012         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
1013                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
1014         return 0;
1015 }
1016
1017 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
1018 {
1019         struct lod_object *mo = lu2lod_obj(o);
1020
1021         /*
1022          * release all underlying object pinned
1023          */
1024
1025         lod_object_free_striping(env, mo);
1026
1027         lod_object_set_pool(mo, NULL);
1028
1029         lu_object_fini(o);
1030         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
1031 }
1032
1033 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
1034 {
1035         /* XXX: shouldn't we release everything here in case if object
1036          * creation failed before? */
1037 }
1038
1039 static int lod_object_print(const struct lu_env *env, void *cookie,
1040                             lu_printer_t p, const struct lu_object *l)
1041 {
1042         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
1043
1044         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
1045 }
1046
1047 struct lu_object_operations lod_lu_obj_ops = {
1048         .loo_object_init        = lod_object_init,
1049         .loo_object_start       = lod_object_start,
1050         .loo_object_free        = lod_object_free,
1051         .loo_object_release     = lod_object_release,
1052         .loo_object_print       = lod_object_print,
1053 };