Whamcloud - gitweb
d4e11ca6ceb1d954cb1e4f4dd6e4f0725dbf3496
[fs/lustre-release.git] / lustre / mdt / mdt_lvb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * lustre/mdt/mdt_lvb.c
28  *
29  * Author: Jinshan Xiong <jinshan.xiong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDS
33 #include <lustre_swab.h>
34 #include "mdt_internal.h"
35
36 /* Called with res->lr_lvb_sem held */
37 static int mdt_lvbo_init(const struct lu_env *env, struct ldlm_resource *res)
38 {
39         if (IS_LQUOTA_RES(res)) {
40                 struct mdt_device       *mdt;
41
42                 mdt = ldlm_res_to_ns(res)->ns_lvbp;
43                 if (mdt->mdt_qmt_dev == NULL)
44                         return 0;
45
46                 /* call lvbo init function of quota master */
47                 return qmt_hdls.qmth_lvbo_init(mdt->mdt_qmt_dev, res);
48         }
49         return 0;
50 }
51
52 int mdt_dom_lvb_alloc(struct ldlm_resource *res)
53 {
54         struct ost_lvb *lvb;
55
56         mutex_lock(&res->lr_lvb_mutex);
57         if (res->lr_lvb_data == NULL) {
58                 OBD_ALLOC_PTR(lvb);
59                 if (lvb == NULL) {
60                         mutex_unlock(&res->lr_lvb_mutex);
61                         return -ENOMEM;
62                 }
63
64                 res->lr_lvb_data = lvb;
65                 res->lr_lvb_len = sizeof(*lvb);
66
67                 /* Store error in LVB to inidicate it has no data yet.
68                  */
69                 OST_LVB_SET_ERR(lvb->lvb_blocks, -ENODATA);
70         }
71         mutex_unlock(&res->lr_lvb_mutex);
72         return 0;
73 }
74
75 int mdt_dom_lvb_is_valid(struct ldlm_resource *res)
76 {
77         struct ost_lvb *res_lvb = res->lr_lvb_data;
78
79         return !(res_lvb == NULL || OST_LVB_IS_ERR(res_lvb->lvb_blocks));
80 }
81
82 int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo,
83                              struct ldlm_resource *res, bool increase_only)
84 {
85         struct mdt_thread_info *info = mdt_th_info(env);
86         const struct lu_fid *fid = mdt_object_fid(mo);
87         struct ost_lvb *lvb;
88         struct md_attr *ma;
89         int rc = 0;
90
91         ENTRY;
92
93         lvb = res->lr_lvb_data;
94         LASSERT(lvb);
95
96         if (!mdt_object_exists(mo) || mdt_object_remote(mo))
97                 RETURN(-ENOENT);
98
99         ma = &info->mti_attr2;
100         ma->ma_valid = 0;
101         ma->ma_need = MA_INODE;
102         rc = mo_attr_get(env, mdt_object_child(mo), ma);
103         if (rc)
104                 RETURN(rc);
105
106         lock_res(res);
107         if (ma->ma_attr.la_size > lvb->lvb_size || !increase_only) {
108                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size from disk: "
109                        "%llu -> %llu\n", PFID(fid),
110                        lvb->lvb_size, ma->ma_attr.la_size);
111                 lvb->lvb_size = ma->ma_attr.la_size;
112         }
113
114         if (ma->ma_attr.la_mtime > lvb->lvb_mtime || !increase_only) {
115                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime from disk: "
116                        "%llu -> %llu\n", PFID(fid),
117                        lvb->lvb_mtime, ma->ma_attr.la_mtime);
118                 lvb->lvb_mtime = ma->ma_attr.la_mtime;
119         }
120         if (ma->ma_attr.la_atime > lvb->lvb_atime || !increase_only) {
121                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime from disk: "
122                        "%llu -> %llu\n", PFID(fid),
123                        lvb->lvb_atime, ma->ma_attr.la_atime);
124                 lvb->lvb_atime = ma->ma_attr.la_atime;
125         }
126         if (ma->ma_attr.la_ctime > lvb->lvb_ctime || !increase_only) {
127                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime from disk: "
128                        "%llu -> %llu\n", PFID(fid),
129                        lvb->lvb_ctime, ma->ma_attr.la_ctime);
130                 lvb->lvb_ctime = ma->ma_attr.la_ctime;
131         }
132         if (ma->ma_attr.la_blocks > lvb->lvb_blocks || !increase_only) {
133                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks from disk: "
134                        "%llu -> %llu\n", PFID(fid), lvb->lvb_blocks,
135                        (unsigned long long)ma->ma_attr.la_blocks);
136                 lvb->lvb_blocks = ma->ma_attr.la_blocks;
137         }
138         unlock_res(res);
139
140         RETURN(rc);
141 }
142
143 int mdt_dom_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
144                         struct ldlm_lock *lock, struct ptlrpc_request *req,
145                         bool increase_only)
146 {
147         struct obd_export *exp = lock ? lock->l_export : NULL;
148         struct mdt_device *mdt;
149         struct mdt_object *mo;
150         struct mdt_thread_info *info;
151         struct ost_lvb *lvb;
152         struct lu_fid *fid;
153         int rc = 0;
154
155         ENTRY;
156
157         /* Before going further let's check that OBD and export are healthy.
158          */
159         if (exp != NULL &&
160             (exp->exp_disconnected || exp->exp_failed ||
161              exp->exp_obd->obd_stopping)) {
162                 CDEBUG(D_INFO, "Skip LVB update, export is %s, obd is %s\n",
163                        exp->exp_failed ? "failed" : "disconnected",
164                        exp->exp_obd->obd_stopping ? "stopping" : "OK");
165                 RETURN(0);
166         }
167
168         rc = mdt_dom_lvb_alloc(res);
169         if (rc < 0)
170                 RETURN(rc);
171
172         mdt = ldlm_res_to_ns(res)->ns_lvbp;
173         if (mdt == NULL)
174                 RETURN(-ENOENT);
175
176         LASSERT(env);
177         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
178         if (!info) {
179                 rc = lu_env_refill_by_tags((struct lu_env *)env,
180                                            LCT_MD_THREAD, 0);
181                 if (rc)
182                         GOTO(out_env, rc);
183                 info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
184                 if (!info)
185                         GOTO(out_env, rc = -ENOMEM);
186         }
187         if (!info->mti_exp)
188                 info->mti_exp = req ? req->rq_export : NULL;
189         if (!info->mti_mdt)
190                 info->mti_mdt = mdt;
191
192         fid = &info->mti_tmp_fid2;
193         fid_extract_from_res_name(fid, &res->lr_name);
194
195         lvb = res->lr_lvb_data;
196         LASSERT(lvb);
197
198         /* Update the LVB from the network message */
199         if (req != NULL) {
200                 struct ost_lvb *rpc_lvb;
201
202                 rpc_lvb = req_capsule_server_swab_get(&req->rq_pill,
203                                                       &RMF_DLM_LVB,
204                                                       lustre_swab_ost_lvb);
205                 if (rpc_lvb == NULL)
206                         goto disk_update;
207
208                 lock_res(res);
209                 if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) {
210                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size: "
211                                "%llu -> %llu\n", PFID(fid),
212                                lvb->lvb_size, rpc_lvb->lvb_size);
213                         lvb->lvb_size = rpc_lvb->lvb_size;
214                 }
215                 if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) {
216                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime: "
217                                "%llu -> %llu\n", PFID(fid),
218                                lvb->lvb_mtime, rpc_lvb->lvb_mtime);
219                         lvb->lvb_mtime = rpc_lvb->lvb_mtime;
220                 }
221                 if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) {
222                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime: "
223                                "%llu -> %llu\n", PFID(fid),
224                                lvb->lvb_atime, rpc_lvb->lvb_atime);
225                         lvb->lvb_atime = rpc_lvb->lvb_atime;
226                 }
227                 if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) {
228                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime: "
229                                "%llu -> %llu\n", PFID(fid),
230                                lvb->lvb_ctime, rpc_lvb->lvb_ctime);
231                         lvb->lvb_ctime = rpc_lvb->lvb_ctime;
232                 }
233                 if (rpc_lvb->lvb_blocks > lvb->lvb_blocks || !increase_only) {
234                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks: "
235                                "%llu -> %llu\n", PFID(fid),
236                                lvb->lvb_blocks, rpc_lvb->lvb_blocks);
237                         lvb->lvb_blocks = rpc_lvb->lvb_blocks;
238                 }
239                 unlock_res(res);
240         }
241
242 disk_update:
243         /* Update the LVB from the disk inode */
244         mo = mdt_object_find(env, mdt, fid);
245         if (IS_ERR(mo))
246                 GOTO(out_env, rc = PTR_ERR(mo));
247
248         rc = mdt_dom_disk_lvbo_update(env, mo, res, !!increase_only);
249         mdt_object_put(env, mo);
250 out_env:
251         return rc;
252 }
253
254 static int mdt_lvbo_update(const struct lu_env *env, struct ldlm_resource *res,
255                            struct ldlm_lock *lock, struct ptlrpc_request *req,
256                            int increase_only)
257 {
258         ENTRY;
259
260         if (IS_LQUOTA_RES(res)) {
261                 struct mdt_device *mdt;
262
263                 mdt = ldlm_res_to_ns(res)->ns_lvbp;
264                 if (mdt->mdt_qmt_dev == NULL)
265                         return 0;
266
267                 /* call lvbo update function of quota master */
268                 return qmt_hdls.qmth_lvbo_update(mdt->mdt_qmt_dev, res, req,
269                                                  increase_only);
270         }
271
272         /* Data-on-MDT lvbo update.
273          * Like a ldlm_lock_init() the lock can be skipped and that means
274          * it is DOM resource because lvbo_update() without lock is called
275          * by MDT for DOM objects only.
276          */
277         if (lock == NULL || ldlm_has_dom(lock))
278                 return mdt_dom_lvbo_update(env, res, lock, req,
279                                            !!increase_only);
280         return 0;
281 }
282
283
284 static int mdt_lvbo_size(struct ldlm_lock *lock)
285 {
286         struct mdt_device *mdt;
287
288         /* resource on server side never changes. */
289         mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp;
290         LASSERT(mdt != NULL);
291
292         if (IS_LQUOTA_RES(lock->l_resource)) {
293                 if (mdt->mdt_qmt_dev == NULL)
294                         return 0;
295
296                 /* call lvbo size function of quota master */
297                 return qmt_hdls.qmth_lvbo_size(mdt->mdt_qmt_dev, lock);
298         }
299
300         if (ldlm_has_dom(lock))
301                 return sizeof(struct ost_lvb);
302
303         if (ldlm_has_layout(lock))
304                 return mdt->mdt_max_mdsize;
305
306         return 0;
307 }
308
309 /**
310  * Implementation of ldlm_valblock_ops::lvbo_fill for MDT.
311  *
312  * This function is called to fill the given RPC buffer \a buf with LVB data
313  *
314  * \param[in] env               execution environment
315  * \param[in] lock              LDLM lock
316  * \param[in] buf               RPC buffer to fill
317  * \param[in,out] lvblen        lvb buffer length
318  *
319  * \retval              size of LVB data written into \a buf buffer
320  *                      or -ERANGE when the provided @lvblen is not big enough,
321  *                      and the needed lvb buffer size will be returned in
322  *                      @lvblen
323  */
324 static int mdt_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock,
325                          void *lvb, int *lvblen)
326 {
327         struct mdt_thread_info *info;
328         struct mdt_device *mdt;
329         struct lu_fid *fid;
330         struct mdt_object *obj = NULL;
331         struct md_object *child = NULL;
332         struct lu_env _env;
333         int rc;
334         ENTRY;
335
336         if (!env) {
337                 rc = lu_env_init(&_env, LCT_DT_THREAD);
338                 if (rc)
339                         RETURN(rc);
340                 env = &_env;
341         }
342
343         mdt = ldlm_lock_to_ns(lock)->ns_lvbp;
344         if (IS_LQUOTA_RES(lock->l_resource)) {
345                 if (mdt->mdt_qmt_dev == NULL)
346                         GOTO(out_env, rc = 0);
347
348                 /* call lvbo fill function of quota master */
349                 rc = qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb,
350                                              *lvblen);
351                 GOTO(out_env, rc);
352         }
353
354         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
355         if (!info) {
356                 rc = lu_env_refill_by_tags((struct lu_env *)env,
357                                            LCT_MD_THREAD, 0);
358                 if (rc)
359                         GOTO(out, rc);
360                 info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
361                 if (!info)
362                         GOTO(out, rc = -ENOMEM);
363         }
364         if (!info->mti_env)
365                 info->mti_env = env;
366         if (!info->mti_exp)
367                 info->mti_exp = lock->l_export;
368         if (!info->mti_mdt)
369                 info->mti_mdt = mdt;
370
371         /* LVB for DoM lock is needed only for glimpse,
372          * don't fill DoM data if there is layout lock */
373         if (ldlm_has_dom(lock)) {
374                 struct ldlm_resource *res = lock->l_resource;
375                 int lvb_len = sizeof(struct ost_lvb);
376
377                 if (!mdt_dom_lvb_is_valid(res))
378                         mdt_dom_lvbo_update(env, lock->l_resource,
379                                             lock, NULL, 0);
380
381                 if (lvb_len > *lvblen)
382                         lvb_len = *lvblen;
383
384                 lock_res(res);
385                 memcpy(lvb, res->lr_lvb_data, lvb_len);
386                 unlock_res(res);
387
388                 GOTO(out, rc = lvb_len);
389         }
390
391         /* Only fill layout if layout lock is granted */
392         if (!ldlm_has_layout(lock) || !ldlm_is_granted(lock))
393                 GOTO(out, rc = 0);
394
395         /* XXX get fid by resource id. why don't include fid in ldlm_resource */
396         fid = &info->mti_tmp_fid2;
397         fid_extract_from_res_name(fid, &lock->l_resource->lr_name);
398
399         obj = mdt_object_find(env, info->mti_mdt, fid);
400         if (IS_ERR(obj))
401                 GOTO(out, rc = PTR_ERR(obj));
402
403         if (!mdt_object_exists(obj) || mdt_object_remote(obj))
404                 GOTO(out_put, rc = -ENOENT);
405
406         child = mdt_object_child(obj);
407
408         /* get the length of lsm */
409         rc = mo_xattr_get(env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
410         if (rc < 0)
411                 GOTO(out_put, rc);
412         if (rc > 0) {
413                 struct lu_buf *lmm = NULL;
414                 if (*lvblen < rc) {
415                         int level;
416
417                         /* The layout EA may be larger than mdt_max_mdsize
418                          * and in that case mdt_max_mdsize is just updated
419                          * but if EA size is less than mdt_max_mdsize then
420                          * it is an error in lvblen value provided. */
421                         if (rc > info->mti_mdt->mdt_max_mdsize) {
422                                 info->mti_mdt->mdt_max_mdsize = rc;
423                                 level = D_INFO;
424                         } else {
425                                 level = D_ERROR;
426                         }
427                         CDEBUG_LIMIT(level, "%s: small buffer size %d for EA "
428                                      "%d (max_mdsize %d): rc = %d\n",
429                                      mdt_obd_name(mdt), *lvblen, rc,
430                                      info->mti_mdt->mdt_max_mdsize, -ERANGE);
431                         *lvblen = rc;
432                         GOTO(out_put, rc = -ERANGE);
433                 }
434                 lmm = &info->mti_buf;
435                 lmm->lb_buf = lvb;
436                 lmm->lb_len = rc;
437                 rc = mo_xattr_get(env, child, lmm, XATTR_NAME_LOV);
438                 if (rc < 0)
439                         GOTO(out_put, rc);
440         }
441
442 out_put:
443         if (obj != NULL && !IS_ERR(obj))
444                 mdt_object_put(env, obj);
445 out:
446         if (rc < 0 && rc != -ERANGE)
447                 rc = 0;
448 out_env:
449         if (env == &_env)
450                 lu_env_fini(&_env);
451         RETURN(rc);
452 }
453
454 static int mdt_lvbo_free(struct ldlm_resource *res)
455 {
456         if (IS_LQUOTA_RES(res)) {
457                 struct mdt_device       *mdt;
458
459                 mdt = ldlm_res_to_ns(res)->ns_lvbp;
460                 if (mdt->mdt_qmt_dev == NULL)
461                         return 0;
462
463                 /* call lvbo free function of quota master */
464                 return qmt_hdls.qmth_lvbo_free(mdt->mdt_qmt_dev, res);
465         }
466
467         /* Data-on-MDT lvbo free */
468         if (res->lr_lvb_data != NULL)
469                 OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
470         return 0;
471 }
472
473 struct ldlm_valblock_ops mdt_lvbo = {
474         .lvbo_init      = mdt_lvbo_init,
475         .lvbo_update    = mdt_lvbo_update,
476         .lvbo_size      = mdt_lvbo_size,
477         .lvbo_fill      = mdt_lvbo_fill,
478         .lvbo_free      = mdt_lvbo_free
479 };