Whamcloud - gitweb
1f48a6c7f8b3f9282588f1f81b024b638cc281d9
[fs/lustre-release.git] / lustre / ofd / ofd_objects.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ofd/ofd_objects.c
37  *
38  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
39  * Author: Mikhail Pershin <tappro@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_FILTER
43
44 #include <dt_object.h>
45
46 #include "ofd_internal.h"
47
48 int ofd_version_get_check(struct ofd_thread_info *info,
49                           struct ofd_object *fo)
50 {
51         dt_obj_version_t curr_version;
52
53         LASSERT(ofd_object_exists(fo));
54         LASSERT(info->fti_exp);
55
56         curr_version = dt_version_get(info->fti_env, ofd_object_child(fo));
57         if ((__s64)curr_version == -EOPNOTSUPP)
58                 RETURN(0);
59         /* VBR: version is checked always because costs nothing */
60         if (info->fti_pre_version != 0 &&
61             info->fti_pre_version != curr_version) {
62                 CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
63                        info->fti_pre_version, curr_version);
64                 cfs_spin_lock(&info->fti_exp->exp_lock);
65                 info->fti_exp->exp_vbr_failed = 1;
66                 cfs_spin_unlock(&info->fti_exp->exp_lock);
67                 RETURN (-EOVERFLOW);
68         }
69         info->fti_pre_version = curr_version;
70         RETURN(0);
71 }
72
73 struct ofd_object *ofd_object_find(const struct lu_env *env,
74                                    struct ofd_device *ofd,
75                                    const struct lu_fid *fid)
76 {
77         struct ofd_object *fo;
78         struct lu_object  *o;
79
80         ENTRY;
81
82         o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL);
83         if (likely(!IS_ERR(o)))
84                 fo = ofd_obj(o);
85         else
86                 fo = (struct ofd_object *)o; /* return error */
87         RETURN(fo);
88 }
89
90 struct ofd_object *ofd_object_find_or_create(const struct lu_env *env,
91                                              struct ofd_device *ofd,
92                                              const struct lu_fid *fid,
93                                              struct lu_attr *attr)
94 {
95         struct ofd_thread_info  *info = ofd_info(env);
96         struct lu_object        *fo_obj;
97         struct dt_object        *dto;
98
99         ENTRY;
100
101         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
102
103         dto = dt_find_or_create(env, ofd->ofd_osd, fid, &info->fti_dof, attr);
104         if (IS_ERR(dto))
105                 RETURN((struct ofd_object *)dto);
106
107         fo_obj = lu_object_locate(dto->do_lu.lo_header,
108                                   ofd->ofd_dt_dev.dd_lu_dev.ld_type);
109         RETURN(ofd_obj(fo_obj));
110 }
111
112 int ofd_object_ff_check(const struct lu_env *env, struct ofd_object *fo)
113 {
114         struct ofd_thread_info  *info = ofd_info(env);
115         int                      rc = 0;
116
117         ENTRY;
118
119         if (!fo->ofo_ff_exists) {
120                 /*
121                  * This actually means that we don't know whether the object
122                  * has the "fid" EA or not.
123                  */
124                 info->fti_buf.lb_buf = &info->fti_mds_fid2;
125                 info->fti_buf.lb_len = sizeof(info->fti_mds_fid2);
126                 rc = dt_xattr_get(env, ofd_object_child(fo), &info->fti_buf,
127                                   XATTR_NAME_FID, BYPASS_CAPA);
128                 if (rc >= 0 || rc == -ENODATA) {
129                         /*
130                          * Here we assume that, if the object doesn't have the
131                          * "fid" EA, the caller will add one, unless a fatal
132                          * error (e.g., a memory or disk failure) prevents it
133                          * from doing so.
134                          */
135                         fo->ofo_ff_exists = 1;
136                 }
137                 if (rc > 0)
138                         rc = 0;
139         }
140         RETURN(rc);
141 }
142
143 void ofd_object_put(const struct lu_env *env, struct ofd_object *fo)
144 {
145         lu_object_put(env, &fo->ofo_obj.do_lu);
146 }
147
148 int ofd_precreate_object(const struct lu_env *env, struct ofd_device *ofd,
149                          obd_id id, obd_seq group)
150 {
151         struct ofd_thread_info  *info = ofd_info(env);
152         struct ofd_object       *fo;
153         struct dt_object        *next;
154         struct thandle          *th;
155         obd_id                   tmp;
156         int                      rc;
157
158         ENTRY;
159
160         /* Don't create objects beyond the valid range for this SEQ */
161         if (unlikely(fid_seq_is_mdt0(group) && id >= IDIF_MAX_OID)) {
162                 CERROR("%s:"POSTID" hit the IDIF_MAX_OID (1<<48)!\n",
163                        ofd_name(ofd), id, group);
164                 RETURN(rc = -ENOSPC);
165         } else if (unlikely(!fid_seq_is_mdt0(group) && id >= OBIF_MAX_OID)) {
166                 CERROR("%s:"POSTID" hit the OBIF_MAX_OID (1<<32)!\n",
167                        ofd_name(ofd), id, group);
168                 RETURN(rc = -ENOSPC);
169         }
170         info->fti_ostid.oi_id = id;
171         info->fti_ostid.oi_seq = group;
172         fid_ostid_unpack(&info->fti_fid, &info->fti_ostid, 0);
173
174         fo = ofd_object_find(env, ofd, &info->fti_fid);
175         if (IS_ERR(fo))
176                 RETURN(PTR_ERR(fo));
177
178         info->fti_attr.la_valid = LA_TYPE | LA_MODE;
179         /*
180          * We mark object SUID+SGID to flag it for accepting UID+GID from
181          * client on first write.  Currently the permission bits on the OST are
182          * never used, so this is OK.
183          */
184         info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | 0666;
185         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
186
187         /* Initialize a/c/m time so any client timestamp will always
188          * be newer and update the inode. ctime = 0 is also handled
189          * specially in osd_inode_setattr(). See LU-221, LU-1042 */
190         info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME;
191         info->fti_attr.la_atime = 0;
192         info->fti_attr.la_mtime = 0;
193         info->fti_attr.la_ctime = 0;
194
195         next = ofd_object_child(fo);
196         LASSERT(next != NULL);
197
198         info->fti_buf.lb_buf = &tmp;
199         info->fti_buf.lb_len = sizeof(tmp);
200         info->fti_off = 0;
201
202         ofd_write_lock(env, fo);
203         th = ofd_trans_create(env, ofd);
204         if (IS_ERR(th))
205                 GOTO(out_unlock, rc = PTR_ERR(th));
206
207         rc = dt_declare_record_write(env, ofd->ofd_lastid_obj[group],
208                                      sizeof(tmp), info->fti_off, th);
209         if (rc)
210                 GOTO(trans_stop, rc);
211
212         if (unlikely(ofd_object_exists(fo))) {
213                 /* object may exist being re-created by write replay */
214                 CDEBUG(D_INODE, "object %u/"LPD64" exists: "DFID"\n",
215                        (unsigned) group, id, PFID(&info->fti_fid));
216                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
217                 if (rc)
218                         GOTO(trans_stop, rc);
219                 GOTO(last_id_write, rc);
220         }
221         rc = dt_declare_create(env, next, &info->fti_attr, NULL,
222                                &info->fti_dof, th);
223         if (rc)
224                 GOTO(trans_stop, rc);
225
226         rc = dt_trans_start_local(env, ofd->ofd_osd, th);
227         if (rc)
228                 GOTO(trans_stop, rc);
229
230         CDEBUG(D_OTHER, "create new object %lu:%llu\n",
231                (unsigned long) info->fti_fid.f_oid, info->fti_fid.f_seq);
232
233         rc = dt_create(env, next, &info->fti_attr, NULL, &info->fti_dof, th);
234         if (rc)
235                 GOTO(trans_stop, rc);
236         LASSERT(ofd_object_exists(fo));
237
238 last_id_write:
239         ofd_last_id_set(ofd, id, group);
240
241         tmp = cpu_to_le64(ofd_last_id(ofd, group));
242         rc = dt_record_write(env, ofd->ofd_lastid_obj[group], &info->fti_buf,
243                              &info->fti_off, th);
244 trans_stop:
245         ofd_trans_stop(env, ofd, th, rc);
246 out_unlock:
247         ofd_write_unlock(env, fo);
248         ofd_object_put(env, fo);
249         RETURN(rc);
250 }
251
252 /*
253  * If the object still has SUID+SGID bits set (see ofd_precreate_object()) then
254  * we will accept the UID+GID if sent by the client for initializing the
255  * ownership of this object.  We only allow this to happen once (so clear these
256  * bits) and later only allow setattr.
257  */
258 int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo,
259                          struct lu_attr *la, int is_setattr)
260 {
261         struct ofd_thread_info  *info = ofd_info(env);
262         struct lu_attr          *ln = &info->fti_attr2;
263         __u32                    mask = 0;
264         int                      rc;
265
266         ENTRY;
267
268         if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID))
269                 RETURN(0);
270
271         rc = dt_attr_get(env, ofd_object_child(fo), ln, BYPASS_CAPA);
272         if (rc != 0)
273                 RETURN(rc);
274
275         LASSERT(ln->la_valid & LA_MODE);
276
277         if (!is_setattr) {
278                 if (!(ln->la_mode & S_ISUID))
279                         la->la_valid &= ~LA_UID;
280                 if (!(ln->la_mode & S_ISGID))
281                         la->la_valid &= ~LA_GID;
282         }
283
284         if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID))
285                 mask |= S_ISUID;
286         if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID))
287                 mask |= S_ISGID;
288         if (mask != 0) {
289                 if (!(la->la_valid & LA_MODE) || !is_setattr) {
290                         la->la_mode = ln->la_mode;
291                         la->la_valid |= LA_MODE;
292                 }
293                 la->la_mode &= ~mask;
294         }
295
296         RETURN(0);
297 }
298
299 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
300                  struct lu_attr *la, struct filter_fid *ff)
301 {
302         struct ofd_thread_info  *info = ofd_info(env);
303         struct ofd_device       *ofd = ofd_obj2dev(fo);
304         struct thandle          *th;
305         struct ofd_mod_data     *fmd;
306         int                      ff_needed = 0;
307         int                      rc;
308         ENTRY;
309
310         ofd_write_lock(env, fo);
311         if (!ofd_object_exists(fo))
312                 GOTO(unlock, rc = -ENOENT);
313
314         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
315                 fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
316                 if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
317                         fmd->fmd_mactime_xid = info->fti_xid;
318                 ofd_fmd_put(info->fti_exp, fmd);
319         }
320
321         /* VBR: version recovery check */
322         rc = ofd_version_get_check(info, fo);
323         if (rc)
324                 GOTO(unlock, rc);
325
326         rc = ofd_attr_handle_ugid(env, fo, la, 1 /* is_setattr */);
327         if (rc != 0)
328                 GOTO(unlock, rc);
329
330         if (ff != NULL) {
331                 rc = ofd_object_ff_check(env, fo);
332                 if (rc == -ENODATA)
333                         ff_needed = 1;
334                 else if (rc < 0)
335                         GOTO(unlock, rc);
336         }
337
338         th = ofd_trans_create(env, ofd);
339         if (IS_ERR(th))
340                 GOTO(unlock, rc = PTR_ERR(th));
341
342         rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
343         if (rc)
344                 GOTO(stop, rc);
345
346         if (ff_needed) {
347                 info->fti_buf.lb_buf = ff;
348                 info->fti_buf.lb_len = sizeof(*ff);
349                 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
350                                           &info->fti_buf, XATTR_NAME_FID, 0,
351                                           th);
352                 if (rc)
353                         GOTO(stop, rc);
354         }
355
356         rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
357         if (rc)
358                 GOTO(stop, rc);
359
360         rc = dt_attr_set(env, ofd_object_child(fo), la, th,
361                          ofd_object_capa(env, fo));
362         if (rc)
363                 GOTO(stop, rc);
364
365         if (ff_needed)
366                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
367                                   XATTR_NAME_FID, 0, th, BYPASS_CAPA);
368
369 stop:
370         ofd_trans_stop(env, ofd, th, rc);
371 unlock:
372         ofd_write_unlock(env, fo);
373         RETURN(rc);
374 }
375
376 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
377                      __u64 start, __u64 end, struct lu_attr *la,
378                      struct filter_fid *ff)
379 {
380         struct ofd_thread_info  *info = ofd_info(env);
381         struct ofd_device       *ofd = ofd_obj2dev(fo);
382         struct ofd_mod_data     *fmd;
383         struct dt_object        *dob = ofd_object_child(fo);
384         struct thandle          *th;
385         int                      ff_needed = 0;
386         int                      rc;
387
388         ENTRY;
389
390         /* we support truncate, not punch yet */
391         LASSERT(end == OBD_OBJECT_EOF);
392
393         fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
394         if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
395                 fmd->fmd_mactime_xid = info->fti_xid;
396         ofd_fmd_put(info->fti_exp, fmd);
397
398         ofd_write_lock(env, fo);
399         if (!ofd_object_exists(fo))
400                 GOTO(unlock, rc = -ENOENT);
401
402         /* VBR: version recovery check */
403         rc = ofd_version_get_check(info, fo);
404         if (rc)
405                 GOTO(unlock, rc);
406
407         rc = ofd_attr_handle_ugid(env, fo, la, 0 /* !is_setattr */);
408         if (rc != 0)
409                 GOTO(unlock, rc);
410
411         if (ff != NULL) {
412                 rc = ofd_object_ff_check(env, fo);
413                 if (rc == -ENODATA)
414                         ff_needed = 1;
415                 else if (rc < 0)
416                         GOTO(unlock, rc);
417         }
418
419         th = ofd_trans_create(env, ofd);
420         if (IS_ERR(th))
421                 GOTO(unlock, rc = PTR_ERR(th));
422
423         rc = dt_declare_attr_set(env, dob, la, th);
424         if (rc)
425                 GOTO(stop, rc);
426
427         rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
428         if (rc)
429                 GOTO(stop, rc);
430
431         if (ff_needed) {
432                 info->fti_buf.lb_buf = ff;
433                 info->fti_buf.lb_len = sizeof(*ff);
434                 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
435                                           &info->fti_buf, XATTR_NAME_FID, 0,
436                                           th);
437                 if (rc)
438                         GOTO(stop, rc);
439         }
440
441         rc = ofd_trans_start(env, ofd, fo, th);
442         if (rc)
443                 GOTO(stop, rc);
444
445         rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th,
446                       ofd_object_capa(env, fo));
447         if (rc)
448                 GOTO(stop, rc);
449
450         rc = dt_attr_set(env, dob, la, th, ofd_object_capa(env, fo));
451         if (rc)
452                 GOTO(stop, rc);
453
454         if (ff_needed)
455                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
456                                   XATTR_NAME_FID, 0, th, BYPASS_CAPA);
457
458 stop:
459         ofd_trans_stop(env, ofd, th, rc);
460 unlock:
461         ofd_write_unlock(env, fo);
462         RETURN(rc);
463 }
464
465 int ofd_object_destroy(const struct lu_env *env, struct ofd_object *fo,
466                        int orphan)
467 {
468         struct ofd_device       *ofd = ofd_obj2dev(fo);
469         struct thandle          *th;
470         int                      rc = 0;
471
472         ENTRY;
473
474         ofd_write_lock(env, fo);
475         if (!ofd_object_exists(fo))
476                 GOTO(unlock, rc = -ENOENT);
477
478         th = ofd_trans_create(env, ofd);
479         if (IS_ERR(th))
480                 GOTO(unlock, rc = PTR_ERR(th));
481
482         dt_declare_ref_del(env, ofd_object_child(fo), th);
483         dt_declare_destroy(env, ofd_object_child(fo), th);
484         if (orphan)
485                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
486         else
487                 rc = ofd_trans_start(env, ofd, NULL, th);
488         if (rc)
489                 GOTO(stop, rc);
490
491         ofd_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
492
493         dt_ref_del(env, ofd_object_child(fo), th);
494         dt_destroy(env, ofd_object_child(fo), th);
495 stop:
496         ofd_trans_stop(env, ofd, th, rc);
497 unlock:
498         ofd_write_unlock(env, fo);
499         RETURN(rc);
500 }
501
502 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
503                  struct lu_attr *la)
504 {
505         int rc = 0;
506
507         ENTRY;
508
509         if (ofd_object_exists(fo)) {
510                 rc = dt_attr_get(env, ofd_object_child(fo), la,
511                                  ofd_object_capa(env, fo));
512
513 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 50, 0)
514                 /* Try to correct for a bug in 2.1.0 (LU-221) that caused
515                  * negative timestamps to appear to be in the far future,
516                  * due old timestamp being stored on disk as an unsigned value.
517                  * This fixes up any bad values stored on disk before
518                  * returning them to the client, and ensures any timestamp
519                  * updates are correct.  LU-1042 */
520                 if (unlikely(la->la_atime == LU221_BAD_TIME))
521                         la->la_atime = 0;
522                 if (unlikely(la->la_mtime == LU221_BAD_TIME))
523                         la->la_mtime = 0;
524                 if (unlikely(la->la_ctime == LU221_BAD_TIME))
525                         la->la_ctime = 0;
526 #else
527 #warning "remove old LU-221/LU-1042 workaround code"
528 #endif
529         } else {
530                 rc = -ENOENT;
531         }
532         RETURN(rc);
533 }