Whamcloud - gitweb
LU-1267 lfsck: framework (3) for MDT-OST consistency
[fs/lustre-release.git] / lustre / osp / osp_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osp/osp_object.c
37  *
38  * Lustre OST Proxy Device
39  *
40  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
41  * Author: Mikhail Pershin <mike.tappro@intel.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include "osp_internal.h"
47
48 static void osp_object_assign_fid(const struct lu_env *env,
49                                  struct osp_device *d, struct osp_object *o)
50 {
51         struct osp_thread_info *osi = osp_env_info(env);
52
53         LASSERT(fid_is_zero(lu_object_fid(&o->opo_obj.do_lu)));
54         LASSERT(o->opo_reserved);
55         o->opo_reserved = 0;
56
57         osp_precreate_get_fid(env, d, &osi->osi_fid);
58
59         lu_object_assign_fid(env, &o->opo_obj.do_lu, &osi->osi_fid);
60 }
61
62 static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
63                                 const struct lu_attr *attr, struct thandle *th)
64 {
65         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
66         struct osp_object       *o = dt2osp_obj(dt);
67         int                      rc = 0;
68
69         ENTRY;
70
71         /*
72          * Usually we don't allow server stack to manipulate size
73          * but there is a special case when striping is created
74          * late, after stripless file got truncated to non-zero.
75          *
76          * In this case we do the following:
77          *
78          * 1) grab id in declare - this can lead to leaked OST objects
79          *    but we don't currently have proper mechanism and the only
80          *    options we have are to do truncate RPC holding transaction
81          *    open (very bad) or to grab id in declare at cost of leaked
82          *    OST object in same very rare unfortunate case (just bad)
83          *    notice 1.6-2.0 do assignment outside of running transaction
84          *    all the time, meaning many more chances for leaked objects.
85          *
86          * 2) send synchronous truncate RPC with just assigned id
87          */
88
89         /* there are few places in MDD code still passing NULL
90          * XXX: to be fixed soon */
91         if (attr == NULL)
92                 RETURN(0);
93
94         if (attr->la_valid & LA_SIZE && attr->la_size > 0 &&
95             fid_is_zero(lu_object_fid(&o->opo_obj.do_lu))) {
96                 LASSERT(!dt_object_exists(dt));
97                 osp_object_assign_fid(env, d, o);
98                 rc = osp_object_truncate(env, dt, attr->la_size);
99                 if (rc)
100                         RETURN(rc);
101         }
102
103         if (o->opo_new) {
104                 /* no need in logging for new objects being created */
105                 RETURN(0);
106         }
107
108         if (!(attr->la_valid & (LA_UID | LA_GID)))
109                 RETURN(0);
110
111         /*
112          * track all UID/GID changes via llog
113          */
114         rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
115
116         RETURN(rc);
117 }
118
119 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
120                         const struct lu_attr *attr, struct thandle *th,
121                         struct lustre_capa *capa)
122 {
123         struct osp_object       *o = dt2osp_obj(dt);
124         int                      rc = 0;
125
126         ENTRY;
127
128         /* we're interested in uid/gid changes only */
129         if (!(attr->la_valid & (LA_UID | LA_GID)))
130                 RETURN(0);
131
132         /* new object, the very first ->attr_set()
133          * initializing attributes needs no logging
134          * all subsequent one are subject to the
135          * logging and synchronization with OST */
136         if (o->opo_new) {
137                 o->opo_new = 0;
138                 RETURN(0);
139         }
140
141         /*
142          * once transaction is committed put proper command on
143          * the queue going to our OST
144          */
145         rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
146
147         /* XXX: send new uid/gid to OST ASAP? */
148
149         RETURN(rc);
150 }
151
152 static int osp_declare_object_create(const struct lu_env *env,
153                                      struct dt_object *dt,
154                                      struct lu_attr *attr,
155                                      struct dt_allocation_hint *hint,
156                                      struct dt_object_format *dof,
157                                      struct thandle *th)
158 {
159         struct osp_thread_info  *osi = osp_env_info(env);
160         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
161         struct osp_object       *o = dt2osp_obj(dt);
162         const struct lu_fid     *fid;
163         int                      rc = 0;
164
165         ENTRY;
166
167         /* should happen to non-0 OSP only so that at least one object
168          * has been already declared in the scenario and LOD should
169          * cleanup that */
170         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL) && d->opd_index == 1)
171                 RETURN(-ENOSPC);
172
173         LASSERT(d->opd_last_used_oid_file);
174         fid = lu_object_fid(&dt->do_lu);
175
176         /*
177          * There can be gaps in precreated ids and record to unlink llog
178          * XXX: we do not handle gaps yet, implemented before solution
179          *      was found to be racy, so we disabled that. there is no
180          *      point in making useless but expensive llog declaration.
181          */
182         /* rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); */
183
184         if (unlikely(!fid_is_zero(fid))) {
185                 /* replay case: caller knows fid */
186                 osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
187                 rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
188                                              sizeof(osi->osi_id), osi->osi_off,
189                                              th);
190                 RETURN(rc);
191         }
192
193         /*
194          * in declaration we need to reserve object so that we don't block
195          * awaiting precreation RPC to complete
196          */
197         rc = osp_precreate_reserve(env, d);
198         /*
199          * we also need to declare update to local "last used id" file for
200          * recovery if object isn't used for a reason, we need to release
201          * reservation, this can be made in osd_object_release()
202          */
203         if (rc == 0) {
204                 /* mark id is reserved: in create we don't want to talk
205                  * to OST */
206                 LASSERT(o->opo_reserved == 0);
207                 o->opo_reserved = 1;
208
209                 /* common for all OSPs file hystorically */
210                 osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
211                 rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
212                                              sizeof(osi->osi_id), osi->osi_off,
213                                              th);
214         } else {
215                 /* not needed in the cache anymore */
216                 set_bit(LU_OBJECT_HEARD_BANSHEE,
217                             &dt->do_lu.lo_header->loh_flags);
218         }
219         RETURN(rc);
220 }
221
222 static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
223                              struct lu_attr *attr,
224                              struct dt_allocation_hint *hint,
225                              struct dt_object_format *dof, struct thandle *th)
226 {
227         struct osp_thread_info  *osi = osp_env_info(env);
228         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
229         struct osp_object       *o = dt2osp_obj(dt);
230         int                     rc = 0;
231         struct lu_fid           *fid = &osi->osi_fid;
232         ENTRY;
233
234         if (o->opo_reserved) {
235                 /* regular case, fid is assigned holding trunsaction open */
236                  osp_object_assign_fid(env, d, o);
237         }
238
239         memcpy(fid, lu_object_fid(&dt->do_lu), sizeof(*fid));
240
241         LASSERTF(fid_is_sane(fid), "fid for osp_object %p is insane"DFID"!\n",
242                  o, PFID(fid));
243
244         if (!o->opo_reserved) {
245                 /* special case, id was assigned outside of transaction
246                  * see comments in osp_declare_attr_set */
247                 LASSERT(d->opd_pre != NULL);
248                 spin_lock(&d->opd_pre_lock);
249                 osp_update_last_fid(d, fid);
250                 spin_unlock(&d->opd_pre_lock);
251         }
252
253         CDEBUG(D_INODE, "fid for osp_object %p is "DFID"\n", o, PFID(fid));
254
255         /* If the precreate ends, it means it will be ready to rollover to
256          * the new sequence soon, all the creation should be synchronized,
257          * otherwise during replay, the replay fid will be inconsistent with
258          * last_used/create fid */
259         if (osp_precreate_end_seq(env, d) && osp_is_fid_client(d))
260                 th->th_sync = 1;
261
262         /*
263          * it's OK if the import is inactive by this moment - id was created
264          * by OST earlier, we just need to maintain it consistently on the disk
265          * once import is reconnected, OSP will claim this and other objects
266          * used and OST either keep them, if they exist or recreate
267          */
268
269         /* we might have lost precreated objects */
270         if (unlikely(d->opd_gap_count) > 0) {
271                 LASSERT(d->opd_pre != NULL);
272                 spin_lock(&d->opd_pre_lock);
273                 if (d->opd_gap_count > 0) {
274                         int count = d->opd_gap_count;
275
276                         ostid_set_id(&osi->osi_oi,
277                                      fid_oid(&d->opd_gap_start_fid));
278                         d->opd_gap_count = 0;
279                         spin_unlock(&d->opd_pre_lock);
280
281                         CDEBUG(D_HA, "Writting gap "DFID"+%d in llog\n",
282                                PFID(&d->opd_gap_start_fid), count);
283                         /* real gap handling is disabled intil ORI-692 will be
284                          * fixed, now we only report gaps */
285                 } else {
286                         spin_unlock(&d->opd_pre_lock);
287                 }
288         }
289
290         /* new object, the very first ->attr_set()
291          * initializing attributes needs no logging */
292         o->opo_new = 1;
293
294         /* Only need update last_used oid file, seq file will only be update
295          * during seq rollover */
296         osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off,
297                            &d->opd_last_used_fid.f_oid, d->opd_index);
298
299         rc = dt_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb,
300                              &osi->osi_off, th);
301
302         CDEBUG(D_HA, "%s: Wrote last used FID: "DFID", index %d: %d\n",
303                d->opd_obd->obd_name, PFID(fid), d->opd_index, rc);
304
305         RETURN(rc);
306 }
307
308 static int osp_declare_object_destroy(const struct lu_env *env,
309                                       struct dt_object *dt, struct thandle *th)
310 {
311         struct osp_object       *o = dt2osp_obj(dt);
312         int                      rc = 0;
313
314         ENTRY;
315
316         /*
317          * track objects to be destroyed via llog
318          */
319         rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
320
321         RETURN(rc);
322 }
323
324 static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
325                               struct thandle *th)
326 {
327         struct osp_object       *o = dt2osp_obj(dt);
328         int                      rc = 0;
329
330         ENTRY;
331
332         /*
333          * once transaction is committed put proper command on
334          * the queue going to our OST
335          */
336         rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
337
338         /* not needed in cache any more */
339         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
340
341         RETURN(rc);
342 }
343
344 struct dt_object_operations osp_obj_ops = {
345         .do_declare_attr_set    = osp_declare_attr_set,
346         .do_attr_set            = osp_attr_set,
347         .do_declare_create      = osp_declare_object_create,
348         .do_create              = osp_object_create,
349         .do_declare_destroy     = osp_declare_object_destroy,
350         .do_destroy             = osp_object_destroy,
351 };
352
353 static int is_ost_obj(struct lu_object *lo)
354 {
355         struct osp_device  *osp  = lu2osp_dev(lo->lo_dev);
356
357         return !osp->opd_connect_mdt;
358 }
359
360 static int osp_object_init(const struct lu_env *env, struct lu_object *o,
361                            const struct lu_object_conf *conf)
362 {
363         struct osp_object       *po = lu2osp_obj(o);
364         int                     rc = 0;
365         ENTRY;
366
367         if (is_ost_obj(o)) {
368                 po->opo_obj.do_ops = &osp_obj_ops;
369         } else {
370                 struct lu_attr          *la = &osp_env_info(env)->osi_attr;
371
372                 po->opo_obj.do_ops = &osp_md_obj_ops;
373                 o->lo_header->loh_attr |= LOHA_REMOTE;
374                 rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
375                                                      la, NULL);
376                 if (rc == 0)
377                         o->lo_header->loh_attr |=
378                                 LOHA_EXISTS | (la->la_mode & S_IFMT);
379                 if (rc == -ENOENT)
380                         rc = 0;
381         }
382         RETURN(rc);
383 }
384
385 static void osp_object_free(const struct lu_env *env, struct lu_object *o)
386 {
387         struct osp_object       *obj = lu2osp_obj(o);
388         struct lu_object_header *h = o->lo_header;
389
390         dt_object_fini(&obj->opo_obj);
391         lu_object_header_fini(h);
392         OBD_SLAB_FREE_PTR(obj, osp_object_kmem);
393 }
394
395 static void osp_object_release(const struct lu_env *env, struct lu_object *o)
396 {
397         struct osp_object       *po = lu2osp_obj(o);
398         struct osp_device       *d  = lu2osp_dev(o->lo_dev);
399
400         ENTRY;
401
402         /*
403          * release reservation if object was declared but not created
404          * this may require lu_object_put() in LOD
405          */
406         if (unlikely(po->opo_reserved)) {
407                 LASSERT(d->opd_pre != NULL);
408                 LASSERT(d->opd_pre_reserved > 0);
409                 spin_lock(&d->opd_pre_lock);
410                 d->opd_pre_reserved--;
411                 spin_unlock(&d->opd_pre_lock);
412
413                 /* not needed in cache any more */
414                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
415         }
416
417         if (is_ost_obj(o))
418                 /* XXX: Currently, NOT cache OST-object on MDT because:
419                  *      1. it is not often accessed on MDT.
420                  *      2. avoid up layer (such as LFSCK) to load too many
421                  *         once-used OST-objects. */
422                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
423
424         EXIT;
425 }
426
427 static int osp_object_print(const struct lu_env *env, void *cookie,
428                             lu_printer_t p, const struct lu_object *l)
429 {
430         const struct osp_object *o = lu2osp_obj((struct lu_object *)l);
431
432         return (*p)(env, cookie, LUSTRE_OSP_NAME"-object@%p", o);
433 }
434
435 static int osp_object_invariant(const struct lu_object *o)
436 {
437         LBUG();
438 }
439
440 struct lu_object_operations osp_lu_obj_ops = {
441         .loo_object_init        = osp_object_init,
442         .loo_object_free        = osp_object_free,
443         .loo_object_release     = osp_object_release,
444         .loo_object_print       = osp_object_print,
445         .loo_object_invariant   = osp_object_invariant
446 };