Whamcloud - gitweb
65649be6bee87e06f42f83d686935d2f60bb3391
[fs/lustre-release.git] / lustre / obdclass / dt_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/dt_object.c
37  *
38  * Dt Object.
39  * Generic functions from dt_object.h
40  *
41  * Author: Nikita Danilov <nikita@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_CLASS
45
46 #include <obd.h>
47 #include <dt_object.h>
48 #include <libcfs/list.h>
49 /* fid_be_to_cpu() */
50 #include <lustre_fid.h>
51
52 #include <lustre_quota.h>
53
54 /* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */
55 LU_KEY_INIT(dt_global, struct dt_thread_info);
56 LU_KEY_FINI(dt_global, struct dt_thread_info);
57
58 struct lu_context_key dt_key = {
59         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD | LCT_LOCAL,
60         .lct_init = dt_global_key_init,
61         .lct_fini = dt_global_key_fini
62 };
63
64 /* no lock is necessary to protect the list, because call-backs
65  * are added during system startup. Please refer to "struct dt_device".
66  */
67 void dt_txn_callback_add(struct dt_device *dev, struct dt_txn_callback *cb)
68 {
69         list_add(&cb->dtc_linkage, &dev->dd_txn_callbacks);
70 }
71 EXPORT_SYMBOL(dt_txn_callback_add);
72
73 void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb)
74 {
75         list_del_init(&cb->dtc_linkage);
76 }
77 EXPORT_SYMBOL(dt_txn_callback_del);
78
79 int dt_txn_hook_start(const struct lu_env *env,
80                       struct dt_device *dev, struct thandle *th)
81 {
82         int rc = 0;
83         struct dt_txn_callback *cb;
84
85         if (th->th_local)
86                 return 0;
87
88         list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
89                 struct thandle *dtc_th = th;
90
91                 if (cb->dtc_txn_start == NULL ||
92                     !(cb->dtc_tag & env->le_ctx.lc_tags))
93                         continue;
94
95                 /* Usually dt_txn_hook_start is called from bottom device,
96                  * and if the thandle has th_top, then we need use top
97                  * thandle for the callback in the top thandle layer */
98                 if (th->th_top != NULL)
99                         dtc_th = th->th_top;
100
101                 rc = cb->dtc_txn_start(env, dtc_th, cb->dtc_cookie);
102                 if (rc < 0)
103                         break;
104         }
105         return rc;
106 }
107 EXPORT_SYMBOL(dt_txn_hook_start);
108
109 int dt_txn_hook_stop(const struct lu_env *env, struct thandle *th)
110 {
111         struct dt_device       *dev = th->th_dev;
112         struct dt_txn_callback *cb;
113         int                     rc = 0;
114
115         if (th->th_local)
116                 return 0;
117
118         list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
119                 struct thandle *dtc_th = th;
120
121                 if (cb->dtc_txn_stop == NULL ||
122                     !(cb->dtc_tag & env->le_ctx.lc_tags))
123                         continue;
124
125                 /* Usually dt_txn_hook_stop is called from bottom device,
126                  * and if the thandle has th_top, then we need use top
127                  * thandle for the callback in the top thandle layer */
128                 if (th->th_top != NULL)
129                         dtc_th = th->th_top;
130
131                 rc = cb->dtc_txn_stop(env, dtc_th, cb->dtc_cookie);
132                 if (rc < 0)
133                         break;
134         }
135         return rc;
136 }
137 EXPORT_SYMBOL(dt_txn_hook_stop);
138
139 void dt_txn_hook_commit(struct thandle *th)
140 {
141         struct dt_txn_callback *cb;
142
143         if (th->th_local)
144                 return;
145
146         list_for_each_entry(cb, &th->th_dev->dd_txn_callbacks,
147                             dtc_linkage) {
148                 /* Right now, the bottom device (OSD) will use this hook
149                  * commit to notify OSP, so we do not check and replace
150                  * the thandle to top thandle now */
151                 if (cb->dtc_txn_commit)
152                         cb->dtc_txn_commit(th, cb->dtc_cookie);
153         }
154 }
155 EXPORT_SYMBOL(dt_txn_hook_commit);
156
157 int dt_device_init(struct dt_device *dev, struct lu_device_type *t)
158 {
159         INIT_LIST_HEAD(&dev->dd_txn_callbacks);
160         return lu_device_init(&dev->dd_lu_dev, t);
161 }
162 EXPORT_SYMBOL(dt_device_init);
163
164 void dt_device_fini(struct dt_device *dev)
165 {
166         lu_device_fini(&dev->dd_lu_dev);
167 }
168 EXPORT_SYMBOL(dt_device_fini);
169
170 int dt_object_init(struct dt_object *obj,
171                    struct lu_object_header *h, struct lu_device *d)
172
173 {
174         return lu_object_init(&obj->do_lu, h, d);
175 }
176 EXPORT_SYMBOL(dt_object_init);
177
178 void dt_object_fini(struct dt_object *obj)
179 {
180         lu_object_fini(&obj->do_lu);
181 }
182 EXPORT_SYMBOL(dt_object_fini);
183
184 int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj)
185 {
186         if (obj->do_index_ops == NULL)
187                 obj->do_ops->do_index_try(env, obj, &dt_directory_features);
188         return obj->do_index_ops != NULL;
189 }
190 EXPORT_SYMBOL(dt_try_as_dir);
191
192 enum dt_format_type dt_mode_to_dft(__u32 mode)
193 {
194         enum dt_format_type result;
195
196         switch (mode & S_IFMT) {
197         case S_IFDIR:
198                 result = DFT_DIR;
199                 break;
200         case S_IFREG:
201                 result = DFT_REGULAR;
202                 break;
203         case S_IFLNK:
204                 result = DFT_SYM;
205                 break;
206         case S_IFCHR:
207         case S_IFBLK:
208         case S_IFIFO:
209         case S_IFSOCK:
210                 result = DFT_NODE;
211                 break;
212         default:
213                 LBUG();
214                 break;
215         }
216         return result;
217 }
218 EXPORT_SYMBOL(dt_mode_to_dft);
219
220 /**
221  * lookup fid for object named \a name in directory \a dir.
222  */
223
224 int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
225                   const char *name, struct lu_fid *fid)
226 {
227         if (dt_try_as_dir(env, dir))
228                 return dt_lookup(env, dir, (struct dt_rec *)fid,
229                                  (const struct dt_key *)name);
230         return -ENOTDIR;
231 }
232 EXPORT_SYMBOL(dt_lookup_dir);
233
234 /* this differs from dt_locate by top_dev as parameter
235  * but not one from lu_site */
236 struct dt_object *dt_locate_at(const struct lu_env *env,
237                                struct dt_device *dev,
238                                const struct lu_fid *fid,
239                                struct lu_device *top_dev,
240                                const struct lu_object_conf *conf)
241 {
242         struct lu_object *lo;
243         struct lu_object *n;
244
245         lo = lu_object_find_at(env, top_dev, fid, conf);
246         if (IS_ERR(lo))
247                 return ERR_PTR(PTR_ERR(lo));
248
249         LASSERT(lo != NULL);
250
251         list_for_each_entry(n, &lo->lo_header->loh_layers, lo_linkage) {
252                 if (n->lo_dev == &dev->dd_lu_dev)
253                         return container_of0(n, struct dt_object, do_lu);
254         }
255
256         return ERR_PTR(-ENOENT);
257 }
258 EXPORT_SYMBOL(dt_locate_at);
259
260 /**
261  * find an object named \a entry in given \a dfh->dfh_o directory.
262  */
263 static int dt_find_entry(const struct lu_env *env, const char *entry, void *data)
264 {
265         struct dt_find_hint  *dfh = data;
266         struct dt_device     *dt = dfh->dfh_dt;
267         struct lu_fid        *fid = dfh->dfh_fid;
268         struct dt_object     *obj = dfh->dfh_o;
269         int                   result;
270
271         result = dt_lookup_dir(env, obj, entry, fid);
272         lu_object_put(env, &obj->do_lu);
273         if (result == 0) {
274                 obj = dt_locate(env, dt, fid);
275                 if (IS_ERR(obj))
276                         result = PTR_ERR(obj);
277         }
278         dfh->dfh_o = obj;
279         return result;
280 }
281
282 /**
283  * Abstract function which parses path name. This function feeds
284  * path component to \a entry_func.
285  */
286 int dt_path_parser(const struct lu_env *env,
287                    char *path, dt_entry_func_t entry_func,
288                    void *data)
289 {
290         char *e;
291         int rc = 0;
292
293         while (1) {
294                 e = strsep(&path, "/");
295                 if (e == NULL)
296                         break;
297
298                 if (e[0] == 0) {
299                         if (!path || path[0] == '\0')
300                                 break;
301                         continue;
302                 }
303                 rc = entry_func(env, e, data);
304                 if (rc)
305                         break;
306         }
307
308         return rc;
309 }
310
311 struct dt_object *
312 dt_store_resolve(const struct lu_env *env, struct dt_device *dt,
313                  const char *path, struct lu_fid *fid)
314 {
315         struct dt_thread_info *info = dt_info(env);
316         struct dt_find_hint   *dfh = &info->dti_dfh;
317         struct dt_object      *obj;
318         int                    result;
319
320
321         dfh->dfh_dt = dt;
322         dfh->dfh_fid = fid;
323
324         strlcpy(info->dti_buf, path, sizeof(info->dti_buf));
325
326         result = dt->dd_ops->dt_root_get(env, dt, fid);
327         if (result == 0) {
328                 obj = dt_locate(env, dt, fid);
329                 if (!IS_ERR(obj)) {
330                         dfh->dfh_o = obj;
331                         result = dt_path_parser(env, info->dti_buf,
332                                                 dt_find_entry, dfh);
333                         if (result != 0)
334                                 obj = ERR_PTR(result);
335                         else
336                                 obj = dfh->dfh_o;
337                 }
338         } else {
339                 obj = ERR_PTR(result);
340         }
341         return obj;
342 }
343
344 static struct dt_object *dt_reg_open(const struct lu_env *env,
345                                      struct dt_device *dt,
346                                      struct dt_object *p,
347                                      const char *name,
348                                      struct lu_fid *fid)
349 {
350         struct dt_object *o;
351         int result;
352
353         result = dt_lookup_dir(env, p, name, fid);
354         if (result == 0){
355                 o = dt_locate(env, dt, fid);
356         }
357         else
358                 o = ERR_PTR(result);
359
360         return o;
361 }
362
363 /**
364  * Open dt object named \a filename from \a dirname directory.
365  *      \param  dt      dt device
366  *      \param  fid     on success, object fid is stored in *fid
367  */
368 struct dt_object *dt_store_open(const struct lu_env *env,
369                                 struct dt_device *dt,
370                                 const char *dirname,
371                                 const char *filename,
372                                 struct lu_fid *fid)
373 {
374         struct dt_object *file;
375         struct dt_object *dir;
376
377         dir = dt_store_resolve(env, dt, dirname, fid);
378         if (!IS_ERR(dir)) {
379                 file = dt_reg_open(env, dt, dir,
380                                    filename, fid);
381                 lu_object_put(env, &dir->do_lu);
382         } else {
383                 file = dir;
384         }
385         return file;
386 }
387
388 struct dt_object *dt_find_or_create(const struct lu_env *env,
389                                     struct dt_device *dt,
390                                     const struct lu_fid *fid,
391                                     struct dt_object_format *dof,
392                                     struct lu_attr *at)
393 {
394         struct dt_object *dto;
395         struct thandle *th;
396         int rc;
397
398         ENTRY;
399
400         dto = dt_locate(env, dt, fid);
401         if (IS_ERR(dto))
402                 RETURN(dto);
403
404         LASSERT(dto != NULL);
405         if (dt_object_exists(dto))
406                 RETURN(dto);
407
408         th = dt_trans_create(env, dt);
409         if (IS_ERR(th))
410                 GOTO(out, rc = PTR_ERR(th));
411
412         rc = dt_declare_create(env, dto, at, NULL, dof, th);
413         if (rc)
414                 GOTO(trans_stop, rc);
415
416         rc = dt_trans_start_local(env, dt, th);
417         if (rc)
418                 GOTO(trans_stop, rc);
419
420         dt_write_lock(env, dto, 0);
421         if (dt_object_exists(dto))
422                 GOTO(unlock, rc = 0);
423
424         CDEBUG(D_OTHER, "create new object "DFID"\n", PFID(fid));
425
426         rc = dt_create(env, dto, at, NULL, dof, th);
427         if (rc)
428                 GOTO(unlock, rc);
429         LASSERT(dt_object_exists(dto));
430 unlock:
431         dt_write_unlock(env, dto);
432 trans_stop:
433         dt_trans_stop(env, dt, th);
434 out:
435         if (rc) {
436                 lu_object_put(env, &dto->do_lu);
437                 RETURN(ERR_PTR(rc));
438         }
439         RETURN(dto);
440 }
441 EXPORT_SYMBOL(dt_find_or_create);
442
443 /* dt class init function. */
444 int dt_global_init(void)
445 {
446         int result;
447
448         LU_CONTEXT_KEY_INIT(&dt_key);
449         result = lu_context_key_register(&dt_key);
450         return result;
451 }
452
453 void dt_global_fini(void)
454 {
455         lu_context_key_degister(&dt_key);
456 }
457
458 /**
459  * Generic read helper. May return an error for partial reads.
460  *
461  * \param env  lustre environment
462  * \param dt   object to be read
463  * \param buf  lu_buf to be filled, with buffer pointer and length
464  * \param pos position to start reading, updated as data is read
465  *
466  * \retval real size of data read
467  * \retval -ve errno on failure
468  */
469 int dt_read(const struct lu_env *env, struct dt_object *dt,
470             struct lu_buf *buf, loff_t *pos)
471 {
472         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
473         return dt->do_body_ops->dbo_read(env, dt, buf, pos);
474 }
475 EXPORT_SYMBOL(dt_read);
476
477 /**
478  * Read structures of fixed size from storage.  Unlike dt_read(), using
479  * dt_record_read() will return an error for partial reads.
480  *
481  * \param env  lustre environment
482  * \param dt   object to be read
483  * \param buf  lu_buf to be filled, with buffer pointer and length
484  * \param pos position to start reading, updated as data is read
485  *
486  * \retval 0 on successfully reading full buffer
487  * \retval -EFAULT on short read
488  * \retval -ve errno on failure
489  */
490 int dt_record_read(const struct lu_env *env, struct dt_object *dt,
491                    struct lu_buf *buf, loff_t *pos)
492 {
493         ssize_t size;
494
495         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
496
497         size = dt->do_body_ops->dbo_read(env, dt, buf, pos);
498         if (size < 0)
499                 return size;
500         return (size == (ssize_t)buf->lb_len) ? 0 : -EFAULT;
501 }
502 EXPORT_SYMBOL(dt_record_read);
503
504 int dt_record_write(const struct lu_env *env, struct dt_object *dt,
505                     const struct lu_buf *buf, loff_t *pos, struct thandle *th)
506 {
507         ssize_t size;
508
509         LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
510         LASSERT(th != NULL);
511         LASSERT(dt->do_body_ops);
512         LASSERT(dt->do_body_ops->dbo_write);
513
514         size = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, 1);
515         if (size < 0)
516                 return size;
517         return (size == (ssize_t)buf->lb_len) ? 0 : -EFAULT;
518 }
519 EXPORT_SYMBOL(dt_record_write);
520
521 int dt_declare_version_set(const struct lu_env *env, struct dt_object *o,
522                            struct thandle *th)
523 {
524         struct lu_buf vbuf;
525         char *xname = XATTR_NAME_VERSION;
526
527         LASSERT(o);
528         vbuf.lb_buf = NULL;
529         vbuf.lb_len = sizeof(dt_obj_version_t);
530         return dt_declare_xattr_set(env, o, &vbuf, xname, 0, th);
531
532 }
533 EXPORT_SYMBOL(dt_declare_version_set);
534
535 void dt_version_set(const struct lu_env *env, struct dt_object *o,
536                     dt_obj_version_t version, struct thandle *th)
537 {
538         struct lu_buf vbuf;
539         char *xname = XATTR_NAME_VERSION;
540         int rc;
541
542         LASSERT(o);
543         vbuf.lb_buf = &version;
544         vbuf.lb_len = sizeof(version);
545
546         rc = dt_xattr_set(env, o, &vbuf, xname, 0, th);
547         if (rc < 0)
548                 CDEBUG(D_INODE, "Can't set version, rc %d\n", rc);
549         return;
550 }
551 EXPORT_SYMBOL(dt_version_set);
552
553 dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o)
554 {
555         struct lu_buf vbuf;
556         char *xname = XATTR_NAME_VERSION;
557         dt_obj_version_t version;
558         int rc;
559
560         LASSERT(o);
561         vbuf.lb_buf = &version;
562         vbuf.lb_len = sizeof(version);
563         rc = dt_xattr_get(env, o, &vbuf, xname);
564         if (rc != sizeof(version)) {
565                 CDEBUG(D_INODE, "Can't get version, rc %d\n", rc);
566                 version = 0;
567         }
568         return version;
569 }
570 EXPORT_SYMBOL(dt_version_get);
571
572 /* list of all supported index types */
573
574 /* directories */
575 const struct dt_index_features dt_directory_features;
576 EXPORT_SYMBOL(dt_directory_features);
577
578 /* scrub iterator */
579 const struct dt_index_features dt_otable_features;
580 EXPORT_SYMBOL(dt_otable_features);
581
582 /* lfsck orphan */
583 const struct dt_index_features dt_lfsck_orphan_features = {
584         .dif_flags              = 0,
585         .dif_keysize_min        = sizeof(struct lu_fid),
586         .dif_keysize_max        = sizeof(struct lu_fid),
587         .dif_recsize_min        = sizeof(struct lu_orphan_rec),
588         .dif_recsize_max        = sizeof(struct lu_orphan_rec),
589         .dif_ptrsize            = 4
590 };
591 EXPORT_SYMBOL(dt_lfsck_orphan_features);
592
593 /* lfsck */
594 const struct dt_index_features dt_lfsck_features = {
595         .dif_flags              = DT_IND_UPDATE,
596         .dif_keysize_min        = sizeof(struct lu_fid),
597         .dif_keysize_max        = sizeof(struct lu_fid),
598         .dif_recsize_min        = sizeof(__u8),
599         .dif_recsize_max        = sizeof(__u8),
600         .dif_ptrsize            = 4
601 };
602 EXPORT_SYMBOL(dt_lfsck_features);
603
604 /* accounting indexes */
605 const struct dt_index_features dt_acct_features = {
606         .dif_flags              = DT_IND_UPDATE,
607         .dif_keysize_min        = sizeof(__u64), /* 64-bit uid/gid */
608         .dif_keysize_max        = sizeof(__u64), /* 64-bit uid/gid */
609         .dif_recsize_min        = sizeof(struct lquota_acct_rec), /* 16 bytes */
610         .dif_recsize_max        = sizeof(struct lquota_acct_rec), /* 16 bytes */
611         .dif_ptrsize            = 4
612 };
613 EXPORT_SYMBOL(dt_acct_features);
614
615 /* global quota files */
616 const struct dt_index_features dt_quota_glb_features = {
617         .dif_flags              = DT_IND_UPDATE,
618         /* a different key would have to be used for per-directory quota */
619         .dif_keysize_min        = sizeof(__u64), /* 64-bit uid/gid */
620         .dif_keysize_max        = sizeof(__u64), /* 64-bit uid/gid */
621         .dif_recsize_min        = sizeof(struct lquota_glb_rec), /* 32 bytes */
622         .dif_recsize_max        = sizeof(struct lquota_glb_rec), /* 32 bytes */
623         .dif_ptrsize            = 4
624 };
625 EXPORT_SYMBOL(dt_quota_glb_features);
626
627 /* slave quota files */
628 const struct dt_index_features dt_quota_slv_features = {
629         .dif_flags              = DT_IND_UPDATE,
630         /* a different key would have to be used for per-directory quota */
631         .dif_keysize_min        = sizeof(__u64), /* 64-bit uid/gid */
632         .dif_keysize_max        = sizeof(__u64), /* 64-bit uid/gid */
633         .dif_recsize_min        = sizeof(struct lquota_slv_rec), /* 8 bytes */
634         .dif_recsize_max        = sizeof(struct lquota_slv_rec), /* 8 bytes */
635         .dif_ptrsize            = 4
636 };
637 EXPORT_SYMBOL(dt_quota_slv_features);
638
639 /* helper function returning what dt_index_features structure should be used
640  * based on the FID sequence. This is used by OBD_IDX_READ RPC */
641 static inline const struct dt_index_features *dt_index_feat_select(__u64 seq,
642                                                                    __u32 mode)
643 {
644         if (seq == FID_SEQ_QUOTA_GLB) {
645                 /* global quota index */
646                 if (!S_ISREG(mode))
647                         /* global quota index should be a regular file */
648                         return ERR_PTR(-ENOENT);
649                 return &dt_quota_glb_features;
650         } else if (seq == FID_SEQ_QUOTA) {
651                 /* quota slave index */
652                 if (!S_ISREG(mode))
653                         /* slave index should be a regular file */
654                         return ERR_PTR(-ENOENT);
655                 return &dt_quota_slv_features;
656         } else if (seq == FID_SEQ_LAYOUT_RBTREE){
657                 return &dt_lfsck_orphan_features;
658         } else if (seq >= FID_SEQ_NORMAL) {
659                 /* object is part of the namespace, verify that it is a
660                  * directory */
661                 if (!S_ISDIR(mode))
662                         /* sorry, we can only deal with directory */
663                         return ERR_PTR(-ENOTDIR);
664                 return &dt_directory_features;
665         }
666
667         return ERR_PTR(-EOPNOTSUPP);
668 }
669
670 /*
671  * Fill a lu_idxpage with key/record pairs read for transfer via OBD_IDX_READ
672  * RPC
673  *
674  * \param env - is the environment passed by the caller
675  * \param lp  - is a pointer to the lu_page to fill
676  * \param nob - is the maximum number of bytes that should be copied
677  * \param iops - is the index operation vector associated with the index object
678  * \param it   - is a pointer to the current iterator
679  * \param attr - is the index attribute to pass to iops->rec()
680  * \param arg  - is a pointer to the idx_info structure
681  */
682 static int dt_index_page_build(const struct lu_env *env, union lu_page *lp,
683                                size_t nob, const struct dt_it_ops *iops,
684                                struct dt_it *it, __u32 attr, void *arg)
685 {
686         struct idx_info         *ii = (struct idx_info *)arg;
687         struct lu_idxpage       *lip = &lp->lp_idx;
688         char                    *entry;
689         size_t                   size;
690         int                      rc;
691         ENTRY;
692
693         if (nob < LIP_HDR_SIZE)
694                 return -EINVAL;
695
696         /* initialize the header of the new container */
697         memset(lip, 0, LIP_HDR_SIZE);
698         lip->lip_magic = LIP_MAGIC;
699         nob           -= LIP_HDR_SIZE;
700
701         /* compute size needed to store a key/record pair */
702         size = ii->ii_recsize + ii->ii_keysize;
703         if ((ii->ii_flags & II_FL_NOHASH) == 0)
704                 /* add hash if the client wants it */
705                 size += sizeof(__u64);
706
707         entry = lip->lip_entries;
708         do {
709                 char            *tmp_entry = entry;
710                 struct dt_key   *key;
711                 __u64           hash;
712                 __u16           keysize;
713                 __u16           recsize;
714
715                 /* fetch 64-bit hash value */
716                 hash = iops->store(env, it);
717                 ii->ii_hash_end = hash;
718
719                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_IDX_READ_BREAK)) {
720                         if (lip->lip_nr != 0)
721                                 GOTO(out, rc = 0);
722                 }
723
724                 if (nob < size) {
725                         if (lip->lip_nr == 0)
726                                 GOTO(out, rc = -EINVAL);
727                         GOTO(out, rc = 0);
728                 }
729
730                 if (!(ii->ii_flags & II_FL_NOHASH)) {
731                         /* client wants to the 64-bit hash value associated with
732                          * each record */
733                         memcpy(tmp_entry, &hash, sizeof(hash));
734                         tmp_entry += sizeof(hash);
735                 }
736
737                 if (ii->ii_flags & II_FL_VARKEY)
738                         keysize = iops->key_size(env, it);
739                 else
740                         keysize = ii->ii_keysize;
741
742                 if (!(ii->ii_flags & II_FL_NOKEY)) {
743                         /* then the key value */
744                         key = iops->key(env, it);
745                         memcpy(tmp_entry, key, keysize);
746                         tmp_entry += keysize;
747                 }
748
749                 /* and finally the record */
750                 rc = iops->rec(env, it, (struct dt_rec *)tmp_entry, attr);
751                 if (rc != -ESTALE) {
752                         if (rc != 0)
753                                 GOTO(out, rc);
754
755                         /* hash/key/record successfully copied! */
756                         lip->lip_nr++;
757                         if (unlikely(lip->lip_nr == 1 && ii->ii_count == 0))
758                                 ii->ii_hash_start = hash;
759
760                         if (ii->ii_flags & II_FL_VARREC)
761                                 recsize = iops->rec_size(env, it, attr);
762                         else
763                                 recsize = ii->ii_recsize;
764
765                         entry = tmp_entry + recsize;
766                         nob -= size;
767                 }
768
769                 /* move on to the next record */
770                 do {
771                         rc = iops->next(env, it);
772                 } while (rc == -ESTALE);
773
774         } while (rc == 0);
775
776         GOTO(out, rc);
777 out:
778         if (rc >= 0 && lip->lip_nr > 0)
779                 /* one more container */
780                 ii->ii_count++;
781         if (rc > 0)
782                 /* no more entries */
783                 ii->ii_hash_end = II_END_OFF;
784         return rc;
785 }
786
787
788 /*
789  * Walk index and fill lu_page containers with key/record pairs
790  *
791  * \param env - is the environment passed by the caller
792  * \param obj - is the index object to parse
793  * \param rdpg - is the lu_rdpg descriptor associated with the transfer
794  * \param filler - is the callback function responsible for filling a lu_page
795  *                 with key/record pairs in the format wanted by the caller
796  * \param arg    - is an opaq argument passed to the filler function
797  *
798  * \retval sum (in bytes) of all filled lu_pages
799  * \retval -ve errno on failure
800  */
801 int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
802                   const struct lu_rdpg *rdpg, dt_index_page_build_t filler,
803                   void *arg)
804 {
805         struct dt_it            *it;
806         const struct dt_it_ops  *iops;
807         size_t                   pageidx, nob, nlupgs = 0;
808         int                      rc;
809         ENTRY;
810
811         LASSERT(rdpg->rp_pages != NULL);
812         LASSERT(obj->do_index_ops != NULL);
813
814         nob = rdpg->rp_count;
815         if (nob == 0)
816                 RETURN(-EFAULT);
817
818         /* Iterate through index and fill containers from @rdpg */
819         iops = &obj->do_index_ops->dio_it;
820         LASSERT(iops != NULL);
821         it = iops->init(env, obj, rdpg->rp_attrs);
822         if (IS_ERR(it))
823                 RETURN(PTR_ERR(it));
824
825         rc = iops->load(env, it, rdpg->rp_hash);
826         if (rc == 0) {
827                 /*
828                  * Iterator didn't find record with exactly the key requested.
829                  *
830                  * It is currently either
831                  *
832                  *     - positioned above record with key less than
833                  *     requested---skip it.
834                  *     - or not positioned at all (is in IAM_IT_SKEWED
835                  *     state)---position it on the next item.
836                  */
837                 rc = iops->next(env, it);
838         } else if (rc > 0) {
839                 rc = 0;
840         } else {
841                 if (rc == -ENODATA)
842                         rc = 0;
843                 GOTO(out, rc);
844         }
845
846         /* Fill containers one after the other. There might be multiple
847          * containers per physical page.
848          *
849          * At this point and across for-loop:
850          *  rc == 0 -> ok, proceed.
851          *  rc >  0 -> end of index.
852          *  rc <  0 -> error. */
853         for (pageidx = 0; rc == 0 && nob > 0; pageidx++) {
854                 union lu_page   *lp;
855                 int              i;
856
857                 LASSERT(pageidx < rdpg->rp_npages);
858                 lp = kmap(rdpg->rp_pages[pageidx]);
859
860                 /* fill lu pages */
861                 for (i = 0; i < LU_PAGE_COUNT; i++, lp++, nob -= LU_PAGE_SIZE) {
862                         rc = filler(env, lp, min_t(size_t, nob, LU_PAGE_SIZE),
863                                     iops, it, rdpg->rp_attrs, arg);
864                         if (rc < 0)
865                                 break;
866                         /* one more lu_page */
867                         nlupgs++;
868                         if (rc > 0)
869                                 /* end of index */
870                                 break;
871                 }
872                 kunmap(rdpg->rp_pages[i]);
873         }
874
875 out:
876         iops->put(env, it);
877         iops->fini(env, it);
878
879         if (rc >= 0)
880                 rc = min_t(size_t, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
881
882         RETURN(rc);
883 }
884 EXPORT_SYMBOL(dt_index_walk);
885
886 /**
887  * Walk key/record pairs of an index and copy them into 4KB containers to be
888  * transferred over the network. This is the common handler for OBD_IDX_READ
889  * RPC processing.
890  *
891  * \param env - is the environment passed by the caller
892  * \param dev - is the dt_device storing the index
893  * \param ii  - is the idx_info structure packed by the client in the
894  *              OBD_IDX_READ request
895  * \param rdpg - is the lu_rdpg descriptor
896  *
897  * \retval on success, return sum (in bytes) of all filled containers
898  * \retval appropriate error otherwise.
899  */
900 int dt_index_read(const struct lu_env *env, struct dt_device *dev,
901                   struct idx_info *ii, const struct lu_rdpg *rdpg)
902 {
903         const struct dt_index_features  *feat;
904         struct dt_object                *obj;
905         int                              rc;
906         ENTRY;
907
908         /* rp_count shouldn't be null and should be a multiple of the container
909          * size */
910         if (rdpg->rp_count == 0 || (rdpg->rp_count & (LU_PAGE_SIZE - 1)) != 0)
911                 RETURN(-EFAULT);
912
913         if (!fid_is_quota(&ii->ii_fid) && !fid_is_layout_rbtree(&ii->ii_fid) &&
914             !fid_is_norm(&ii->ii_fid))
915                 RETURN(-EOPNOTSUPP);
916
917         /* lookup index object subject to the transfer */
918         obj = dt_locate(env, dev, &ii->ii_fid);
919         if (IS_ERR(obj))
920                 RETURN(PTR_ERR(obj));
921         if (dt_object_exists(obj) == 0)
922                 GOTO(out, rc = -ENOENT);
923
924         /* fetch index features associated with index object */
925         feat = dt_index_feat_select(fid_seq(&ii->ii_fid),
926                                     lu_object_attr(&obj->do_lu));
927         if (IS_ERR(feat))
928                 GOTO(out, rc = PTR_ERR(feat));
929
930         /* load index feature if not done already */
931         if (obj->do_index_ops == NULL) {
932                 rc = obj->do_ops->do_index_try(env, obj, feat);
933                 if (rc)
934                         GOTO(out, rc);
935         }
936
937         /* fill ii_flags with supported index features */
938         ii->ii_flags &= (II_FL_NOHASH | II_FL_NOKEY | II_FL_VARKEY |
939                          II_FL_VARREC);
940
941         if (!(feat->dif_flags & DT_IND_VARKEY))
942                 ii->ii_keysize = feat->dif_keysize_max;
943
944         if (!(feat->dif_flags & DT_IND_VARREC))
945                 ii->ii_recsize = feat->dif_recsize_max;
946
947         if (feat->dif_flags & DT_IND_NONUNQ)
948                 /* key isn't necessarily unique */
949                 ii->ii_flags |= II_FL_NONUNQ;
950
951         if (!fid_is_layout_rbtree(&ii->ii_fid)) {
952                 dt_read_lock(env, obj, 0);
953                 /* fetch object version before walking the index */
954                 ii->ii_version = dt_version_get(env, obj);
955         }
956
957         /* walk the index and fill lu_idxpages with key/record pairs */
958         rc = dt_index_walk(env, obj, rdpg, dt_index_page_build, ii);
959         if (!fid_is_layout_rbtree(&ii->ii_fid))
960                 dt_read_unlock(env, obj);
961
962         if (rc == 0) {
963                 /* index is empty */
964                 LASSERT(ii->ii_count == 0);
965                 ii->ii_hash_end = II_END_OFF;
966         }
967
968         GOTO(out, rc);
969 out:
970         lu_object_put(env, &obj->do_lu);
971         return rc;
972 }
973 EXPORT_SYMBOL(dt_index_read);
974
975 #ifdef CONFIG_PROC_FS
976 int lprocfs_dt_blksize_seq_show(struct seq_file *m, void *v)
977 {
978         struct dt_device *dt = m->private;
979         struct obd_statfs osfs;
980
981         int rc = dt_statfs(NULL, dt, &osfs);
982         if (rc == 0)
983                 seq_printf(m, "%u\n", (unsigned) osfs.os_bsize);
984         return rc;
985 }
986 EXPORT_SYMBOL(lprocfs_dt_blksize_seq_show);
987
988 int lprocfs_dt_kbytestotal_seq_show(struct seq_file *m, void *v)
989 {
990         struct dt_device *dt = m->private;
991         struct obd_statfs osfs;
992
993         int rc = dt_statfs(NULL, dt, &osfs);
994         if (rc == 0) {
995                 __u32 blk_size = osfs.os_bsize >> 10;
996                 __u64 result = osfs.os_blocks;
997
998                 while (blk_size >>= 1)
999                         result <<= 1;
1000
1001                 seq_printf(m, LPU64"\n", result);
1002         }
1003         return rc;
1004 }
1005 EXPORT_SYMBOL(lprocfs_dt_kbytestotal_seq_show);
1006
1007 int lprocfs_dt_kbytesfree_seq_show(struct seq_file *m, void *v)
1008 {
1009         struct dt_device *dt = m->private;
1010         struct obd_statfs osfs;
1011
1012         int rc = dt_statfs(NULL, dt, &osfs);
1013         if (rc == 0) {
1014                 __u32 blk_size = osfs.os_bsize >> 10;
1015                 __u64 result = osfs.os_bfree;
1016
1017                 while (blk_size >>= 1)
1018                         result <<= 1;
1019
1020                 seq_printf(m, LPU64"\n", result);
1021         }
1022         return rc;
1023 }
1024 EXPORT_SYMBOL(lprocfs_dt_kbytesfree_seq_show);
1025
1026 int lprocfs_dt_kbytesavail_seq_show(struct seq_file *m, void *v)
1027 {
1028         struct dt_device *dt = m->private;
1029         struct obd_statfs osfs;
1030
1031         int rc = dt_statfs(NULL, dt, &osfs);
1032         if (rc == 0) {
1033                 __u32 blk_size = osfs.os_bsize >> 10;
1034                 __u64 result = osfs.os_bavail;
1035
1036                 while (blk_size >>= 1)
1037                         result <<= 1;
1038
1039                 seq_printf(m, LPU64"\n", result);
1040         }
1041         return rc;
1042 }
1043 EXPORT_SYMBOL(lprocfs_dt_kbytesavail_seq_show);
1044
1045 int lprocfs_dt_filestotal_seq_show(struct seq_file *m, void *v)
1046 {
1047         struct dt_device *dt = m->private;
1048         struct obd_statfs osfs;
1049
1050         int rc = dt_statfs(NULL, dt, &osfs);
1051         if (rc == 0)
1052                 seq_printf(m, LPU64"\n", osfs.os_files);
1053         return rc;
1054 }
1055 EXPORT_SYMBOL(lprocfs_dt_filestotal_seq_show);
1056
1057 int lprocfs_dt_filesfree_seq_show(struct seq_file *m, void *v)
1058 {
1059         struct dt_device *dt = m->private;
1060         struct obd_statfs osfs;
1061
1062         int rc = dt_statfs(NULL, dt, &osfs);
1063         if (rc == 0)
1064                 seq_printf(m, LPU64"\n", osfs.os_ffree);
1065         return rc;
1066 }
1067 EXPORT_SYMBOL(lprocfs_dt_filesfree_seq_show);
1068
1069 #endif /* CONFIG_PROC_FS */