Whamcloud - gitweb
LU-957 scrub: osd-ldiskfs itable based iteration
[fs/lustre-release.git] / lustre / obdclass / dt_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/dt_object.c
37  *
38  * Dt Object.
39  * Generic functions from dt_object.h
40  *
41  * Author: Nikita Danilov <nikita@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_CLASS
45
46 #include <obd.h>
47 #include <dt_object.h>
48 #include <libcfs/list.h>
49 /* fid_be_to_cpu() */
50 #include <lustre_fid.h>
51
52 struct dt_find_hint {
53         struct lu_fid        *dfh_fid;
54         struct dt_device     *dfh_dt;
55         struct dt_object     *dfh_o;
56 };
57
58 struct dt_thread_info {
59         char                    dti_buf[DT_MAX_PATH];
60         struct dt_find_hint     dti_dfh;
61 };
62
63 /* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */
64 LU_KEY_INIT(dt_global, struct dt_thread_info);
65 LU_KEY_FINI(dt_global, struct dt_thread_info);
66
67 static struct lu_context_key dt_key = {
68         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD | LCT_LOCAL,
69         .lct_init = dt_global_key_init,
70         .lct_fini = dt_global_key_fini
71 };
72
73 /* no lock is necessary to protect the list, because call-backs
74  * are added during system startup. Please refer to "struct dt_device".
75  */
76 void dt_txn_callback_add(struct dt_device *dev, struct dt_txn_callback *cb)
77 {
78         cfs_list_add(&cb->dtc_linkage, &dev->dd_txn_callbacks);
79 }
80 EXPORT_SYMBOL(dt_txn_callback_add);
81
82 void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb)
83 {
84         cfs_list_del_init(&cb->dtc_linkage);
85 }
86 EXPORT_SYMBOL(dt_txn_callback_del);
87
88 int dt_txn_hook_start(const struct lu_env *env,
89                       struct dt_device *dev, struct thandle *th)
90 {
91         int rc = 0;
92         struct dt_txn_callback *cb;
93
94         if (th->th_local)
95                 return 0;
96
97         cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
98                 if (cb->dtc_txn_start == NULL ||
99                     !(cb->dtc_tag & env->le_ctx.lc_tags))
100                         continue;
101                 rc = cb->dtc_txn_start(env, th, cb->dtc_cookie);
102                 if (rc < 0)
103                         break;
104         }
105         return rc;
106 }
107 EXPORT_SYMBOL(dt_txn_hook_start);
108
109 int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn)
110 {
111         struct dt_device       *dev = txn->th_dev;
112         struct dt_txn_callback *cb;
113         int                     rc = 0;
114
115         if (txn->th_local)
116                 return 0;
117
118         cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
119                 if (cb->dtc_txn_stop == NULL ||
120                     !(cb->dtc_tag & env->le_ctx.lc_tags))
121                         continue;
122                 rc = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
123                 if (rc < 0)
124                         break;
125         }
126         return rc;
127 }
128 EXPORT_SYMBOL(dt_txn_hook_stop);
129
130 void dt_txn_hook_commit(struct thandle *txn)
131 {
132         struct dt_txn_callback *cb;
133
134         if (txn->th_local)
135                 return;
136
137         cfs_list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks,
138                                 dtc_linkage) {
139                 if (cb->dtc_txn_commit)
140                         cb->dtc_txn_commit(txn, cb->dtc_cookie);
141         }
142 }
143 EXPORT_SYMBOL(dt_txn_hook_commit);
144
145 int dt_device_init(struct dt_device *dev, struct lu_device_type *t)
146 {
147
148         CFS_INIT_LIST_HEAD(&dev->dd_txn_callbacks);
149         return lu_device_init(&dev->dd_lu_dev, t);
150 }
151 EXPORT_SYMBOL(dt_device_init);
152
153 void dt_device_fini(struct dt_device *dev)
154 {
155         lu_device_fini(&dev->dd_lu_dev);
156 }
157 EXPORT_SYMBOL(dt_device_fini);
158
159 int dt_object_init(struct dt_object *obj,
160                    struct lu_object_header *h, struct lu_device *d)
161
162 {
163         return lu_object_init(&obj->do_lu, h, d);
164 }
165 EXPORT_SYMBOL(dt_object_init);
166
167 void dt_object_fini(struct dt_object *obj)
168 {
169         lu_object_fini(&obj->do_lu);
170 }
171 EXPORT_SYMBOL(dt_object_fini);
172
173 int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj)
174 {
175         if (obj->do_index_ops == NULL)
176                 obj->do_ops->do_index_try(env, obj, &dt_directory_features);
177         return obj->do_index_ops != NULL;
178 }
179 EXPORT_SYMBOL(dt_try_as_dir);
180
181 enum dt_format_type dt_mode_to_dft(__u32 mode)
182 {
183         enum dt_format_type result;
184
185         switch (mode & S_IFMT) {
186         case S_IFDIR:
187                 result = DFT_DIR;
188                 break;
189         case S_IFREG:
190                 result = DFT_REGULAR;
191                 break;
192         case S_IFLNK:
193                 result = DFT_SYM;
194                 break;
195         case S_IFCHR:
196         case S_IFBLK:
197         case S_IFIFO:
198         case S_IFSOCK:
199                 result = DFT_NODE;
200                 break;
201         default:
202                 LBUG();
203                 break;
204         }
205         return result;
206 }
207 EXPORT_SYMBOL(dt_mode_to_dft);
208
209 /**
210  * lookup fid for object named \a name in directory \a dir.
211  */
212
213 int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
214                   const char *name, struct lu_fid *fid)
215 {
216         if (dt_try_as_dir(env, dir))
217                 return dt_lookup(env, dir, (struct dt_rec *)fid,
218                                  (const struct dt_key *)name, BYPASS_CAPA);
219         return -ENOTDIR;
220 }
221 EXPORT_SYMBOL(dt_lookup_dir);
222 /**
223  * get object for given \a fid.
224  */
225 struct dt_object *dt_locate(const struct lu_env *env,
226                             struct dt_device *dev,
227                             const struct lu_fid *fid)
228 {
229         struct lu_object *obj;
230         struct dt_object *dt;
231
232         obj = lu_object_find(env, &dev->dd_lu_dev, fid, NULL);
233         if (!IS_ERR(obj)) {
234                 obj = lu_object_locate(obj->lo_header, dev->dd_lu_dev.ld_type);
235                 LASSERT(obj != NULL);
236                 dt = container_of(obj, struct dt_object, do_lu);
237         } else
238                 dt = (struct dt_object *)obj;
239         return dt;
240 }
241 EXPORT_SYMBOL(dt_locate);
242
243 /**
244  * find a object named \a entry in given \a dfh->dfh_o directory.
245  */
246 static int dt_find_entry(const struct lu_env *env, const char *entry, void *data)
247 {
248         struct dt_find_hint  *dfh = data;
249         struct dt_device     *dt = dfh->dfh_dt;
250         struct lu_fid        *fid = dfh->dfh_fid;
251         struct dt_object     *obj = dfh->dfh_o;
252         int                   result;
253
254         result = dt_lookup_dir(env, obj, entry, fid);
255         lu_object_put(env, &obj->do_lu);
256         if (result == 0) {
257                 obj = dt_locate(env, dt, fid);
258                 if (IS_ERR(obj))
259                         result = PTR_ERR(obj);
260         }
261         dfh->dfh_o = obj;
262         return result;
263 }
264
265 /**
266  * Abstract function which parses path name. This function feeds
267  * path component to \a entry_func.
268  */
269 int dt_path_parser(const struct lu_env *env,
270                    char *path, dt_entry_func_t entry_func,
271                    void *data)
272 {
273         char *e;
274         int rc = 0;
275
276         while (1) {
277                 e = strsep(&path, "/");
278                 if (e == NULL)
279                         break;
280
281                 if (e[0] == 0) {
282                         if (!path || path[0] == '\0')
283                                 break;
284                         continue;
285                 }
286                 rc = entry_func(env, e, data);
287                 if (rc)
288                         break;
289         }
290
291         return rc;
292 }
293
294 static struct dt_object *dt_store_resolve(const struct lu_env *env,
295                                           struct dt_device *dt,
296                                           const char *path,
297                                           struct lu_fid *fid)
298 {
299         struct dt_thread_info *info = lu_context_key_get(&env->le_ctx,
300                                                          &dt_key);
301         struct dt_find_hint *dfh = &info->dti_dfh;
302         struct dt_object     *obj;
303         char *local = info->dti_buf;
304         int result;
305
306         dfh->dfh_dt = dt;
307         dfh->dfh_fid = fid;
308
309         strncpy(local, path, DT_MAX_PATH);
310         local[DT_MAX_PATH - 1] = '\0';
311
312         result = dt->dd_ops->dt_root_get(env, dt, fid);
313         if (result == 0) {
314                 obj = dt_locate(env, dt, fid);
315                 if (!IS_ERR(obj)) {
316                         dfh->dfh_o = obj;
317                         result = dt_path_parser(env, local, dt_find_entry, dfh);
318                         if (result != 0)
319                                 obj = ERR_PTR(result);
320                         else
321                                 obj = dfh->dfh_o;
322                 }
323         } else {
324                 obj = ERR_PTR(result);
325         }
326         return obj;
327 }
328
329 static struct dt_object *dt_reg_open(const struct lu_env *env,
330                                      struct dt_device *dt,
331                                      struct dt_object *p,
332                                      const char *name,
333                                      struct lu_fid *fid)
334 {
335         struct dt_object *o;
336         int result;
337
338         result = dt_lookup_dir(env, p, name, fid);
339         if (result == 0){
340                 o = dt_locate(env, dt, fid);
341         }
342         else
343                 o = ERR_PTR(result);
344
345         return o;
346 }
347
348 /**
349  * Open dt object named \a filename from \a dirname directory.
350  *      \param  dt      dt device
351  *      \param  fid     on success, object fid is stored in *fid
352  */
353 struct dt_object *dt_store_open(const struct lu_env *env,
354                                 struct dt_device *dt,
355                                 const char *dirname,
356                                 const char *filename,
357                                 struct lu_fid *fid)
358 {
359         struct dt_object *file;
360         struct dt_object *dir;
361
362         dir = dt_store_resolve(env, dt, dirname, fid);
363         if (!IS_ERR(dir)) {
364                 file = dt_reg_open(env, dt, dir,
365                                    filename, fid);
366                 lu_object_put(env, &dir->do_lu);
367         } else {
368                 file = dir;
369         }
370         return file;
371 }
372 EXPORT_SYMBOL(dt_store_open);
373
374 struct dt_object *dt_find_or_create(const struct lu_env *env,
375                                     struct dt_device *dt,
376                                     const struct lu_fid *fid,
377                                     struct dt_object_format *dof,
378                                     struct lu_attr *at)
379 {
380         struct dt_object *dto;
381         struct thandle *th;
382         int rc;
383
384         ENTRY;
385
386         dto = dt_locate(env, dt, fid);
387         if (IS_ERR(dto))
388                 RETURN(dto);
389
390         LASSERT(dto != NULL);
391         if (dt_object_exists(dto))
392                 RETURN(dto);
393
394         th = dt_trans_create(env, dt);
395         if (IS_ERR(th))
396                 GOTO(out, rc = PTR_ERR(th));
397
398         rc = dt_declare_create(env, dto, at, NULL, dof, th);
399         if (rc)
400                 GOTO(trans_stop, rc);
401
402         rc = dt_trans_start_local(env, dt, th);
403         if (rc)
404                 GOTO(trans_stop, rc);
405
406         dt_write_lock(env, dto, 0);
407         if (dt_object_exists(dto))
408                 GOTO(unlock, rc = 0);
409
410         CDEBUG(D_OTHER, "create new object "DFID"\n", PFID(fid));
411
412         rc = dt_create(env, dto, at, NULL, dof, th);
413         if (rc)
414                 GOTO(unlock, rc);
415         LASSERT(dt_object_exists(dto));
416 unlock:
417         dt_write_unlock(env, dto);
418 trans_stop:
419         dt_trans_stop(env, dt, th);
420 out:
421         if (rc) {
422                 lu_object_put(env, &dto->do_lu);
423                 RETURN(ERR_PTR(rc));
424         }
425         RETURN(dto);
426 }
427 EXPORT_SYMBOL(dt_find_or_create);
428
429 /* dt class init function. */
430 int dt_global_init(void)
431 {
432         int result;
433
434         LU_CONTEXT_KEY_INIT(&dt_key);
435         result = lu_context_key_register(&dt_key);
436         return result;
437 }
438
439 void dt_global_fini(void)
440 {
441         lu_context_key_degister(&dt_key);
442 }
443
444 /**
445  * Generic read helper. May return an error for partial reads.
446  *
447  * \param env  lustre environment
448  * \param dt   object to be read
449  * \param buf  lu_buf to be filled, with buffer pointer and length
450  * \param pos position to start reading, updated as data is read
451  *
452  * \retval real size of data read
453  * \retval -ve errno on failure
454  */
455 int dt_read(const struct lu_env *env, struct dt_object *dt,
456             struct lu_buf *buf, loff_t *pos)
457 {
458         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
459         return dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
460 }
461 EXPORT_SYMBOL(dt_read);
462
463 /**
464  * Read structures of fixed size from storage.  Unlike dt_read(), using
465  * dt_record_read() will return an error for partial reads.
466  *
467  * \param env  lustre environment
468  * \param dt   object to be read
469  * \param buf  lu_buf to be filled, with buffer pointer and length
470  * \param pos position to start reading, updated as data is read
471  *
472  * \retval 0 on successfully reading full buffer
473  * \retval -EFAULT on short read
474  * \retval -ve errno on failure
475  */
476 int dt_record_read(const struct lu_env *env, struct dt_object *dt,
477                    struct lu_buf *buf, loff_t *pos)
478 {
479         int rc;
480
481         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
482
483         rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
484
485         if (rc == buf->lb_len)
486                 rc = 0;
487         else if (rc >= 0)
488                 rc = -EFAULT;
489         return rc;
490 }
491 EXPORT_SYMBOL(dt_record_read);
492
493 int dt_record_write(const struct lu_env *env, struct dt_object *dt,
494                     const struct lu_buf *buf, loff_t *pos, struct thandle *th)
495 {
496         int rc;
497
498         LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
499         LASSERT(th != NULL);
500         LASSERT(dt->do_body_ops);
501         LASSERT(dt->do_body_ops->dbo_write);
502         rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1);
503         if (rc == buf->lb_len)
504                 rc = 0;
505         else if (rc >= 0)
506                 rc = -EFAULT;
507         return rc;
508 }
509 EXPORT_SYMBOL(dt_record_write);
510
511 int dt_declare_version_set(const struct lu_env *env, struct dt_object *o,
512                            struct thandle *th)
513 {
514         struct lu_buf vbuf;
515         char *xname = XATTR_NAME_VERSION;
516
517         LASSERT(o);
518         vbuf.lb_buf = NULL;
519         vbuf.lb_len = sizeof(dt_obj_version_t);
520         return dt_declare_xattr_set(env, o, &vbuf, xname, 0, th);
521
522 }
523 EXPORT_SYMBOL(dt_declare_version_set);
524
525 void dt_version_set(const struct lu_env *env, struct dt_object *o,
526                     dt_obj_version_t version, struct thandle *th)
527 {
528         struct lu_buf vbuf;
529         char *xname = XATTR_NAME_VERSION;
530         int rc;
531
532         LASSERT(o);
533         vbuf.lb_buf = &version;
534         vbuf.lb_len = sizeof(version);
535
536         rc = dt_xattr_set(env, o, &vbuf, xname, 0, th, BYPASS_CAPA);
537         if (rc < 0)
538                 CDEBUG(D_INODE, "Can't set version, rc %d\n", rc);
539         return;
540 }
541 EXPORT_SYMBOL(dt_version_set);
542
543 dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o)
544 {
545         struct lu_buf vbuf;
546         char *xname = XATTR_NAME_VERSION;
547         dt_obj_version_t version;
548         int rc;
549
550         LASSERT(o);
551         vbuf.lb_buf = &version;
552         vbuf.lb_len = sizeof(version);
553         rc = dt_xattr_get(env, o, &vbuf, xname, BYPASS_CAPA);
554         if (rc != sizeof(version)) {
555                 CDEBUG(D_INODE, "Can't get version, rc %d\n", rc);
556                 version = 0;
557         }
558         return version;
559 }
560 EXPORT_SYMBOL(dt_version_get);
561
562 const struct dt_index_features dt_directory_features;
563 EXPORT_SYMBOL(dt_directory_features);
564
565 const struct dt_index_features dt_otable_features;
566 EXPORT_SYMBOL(dt_otable_features);