Whamcloud - gitweb
LU-1866 lfsck: enhance otable-based iteration
[fs/lustre-release.git] / lustre / quota / qsd_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include <lustre_dlm.h>
38 #include <obd_class.h>
39
40 #include "qsd_internal.h"
41
42 /*
43  * Return qsd_qtype_info structure associated with a global lock
44  *
45  * \param lock - is the global lock from which we should extract the qqi
46  * \param reset - whether lock->l_ast_data should be cleared
47  */
48 static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
49                                                    bool reset) {
50         struct qsd_qtype_info *qqi;
51         ENTRY;
52
53         lock_res_and_lock(lock);
54         qqi = lock->l_ast_data;
55         if (qqi != NULL) {
56                 qqi_getref(qqi);
57                 if (reset)
58                         lock->l_ast_data = NULL;
59         }
60         unlock_res_and_lock(lock);
61
62         if (qqi != NULL)
63                 /* it is not safe to call lu_ref_add() under spinlock */
64                 lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
65
66         if (reset && qqi != NULL) {
67                 /* release qqi reference hold for the lock */
68                 lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
69                 qqi_putref(qqi);
70         }
71         RETURN(qqi);
72 }
73
74 /*
75  * Return lquota entry structure associated with a per-ID lock
76  *
77  * \param lock - is the per-ID lock from which we should extract the lquota
78  *               entry
79  * \param reset - whether lock->l_ast_data should be cleared
80  */
81 static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
82                                                 bool reset) {
83         struct lquota_entry *lqe;
84         ENTRY;
85
86         lock_res_and_lock(lock);
87         lqe = lock->l_ast_data;
88         if (lqe != NULL) {
89                 lqe_getref(lqe);
90                 if (reset)
91                         lock->l_ast_data = NULL;
92         }
93         unlock_res_and_lock(lock);
94
95         if (reset && lqe != NULL)
96                 /* release lqe reference hold for the lock */
97                 lqe_putref(lqe);
98         RETURN(lqe);
99 }
100
101 /*
102  * Glimpse callback handler for all quota locks. This function extracts
103  * information from the glimpse request.
104  *
105  * \param lock - is the lock targeted by the glimpse
106  * \param data - is a pointer to the glimpse ptlrpc request
107  * \param req  - is the glimpse request
108  * \param desc - is the glimpse descriptor describing the purpose of the glimpse
109  *               request.
110  * \param lvb  - is the pointer to the lvb in the reply buffer
111  *
112  * \retval 0 on success and \desc, \lvb & \arg point to a valid structures,
113  *         appropriate error on failure
114  */
115 static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
116                                   struct ldlm_gl_lquota_desc **desc, void **lvb)
117 {
118         int rc;
119         ENTRY;
120
121         LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
122
123         /* glimpse on quota locks always packs a glimpse descriptor */
124         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_DESC_CALLBACK);
125
126         /* extract glimpse descriptor */
127         *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
128         if (*desc == NULL)
129                 RETURN(-EFAULT);
130
131         /* prepare reply */
132         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
133                              sizeof(struct lquota_lvb));
134         rc = req_capsule_server_pack(&req->rq_pill);
135         if (rc != 0) {
136                 CERROR("Can't pack response, rc %d\n", rc);
137                 RETURN(rc);
138         }
139
140         /* extract lvb */
141         *lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
142
143         RETURN(0);
144 }
145
146 /*
147  * Blocking callback handler for global index lock
148  *
149  * \param lock - is the lock for which ast occurred.
150  * \param desc - is the description of a conflicting lock in case of blocking
151  *               ast.
152  * \param data - is the value of lock->l_ast_data
153  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
154  *               cancellation and blocking ast's.
155  */
156 static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
157                                 struct ldlm_lock_desc *desc, void *data,
158                                 int flag)
159 {
160         int rc = 0;
161         ENTRY;
162
163         switch(flag) {
164         case LDLM_CB_BLOCKING: {
165                 struct lustre_handle lockh;
166
167                 LDLM_DEBUG(lock, "blocking AST on global quota lock");
168                 ldlm_lock2handle(lock, &lockh);
169                 rc = ldlm_cli_cancel(&lockh);
170                 break;
171         }
172         case LDLM_CB_CANCELING: {
173                 struct qsd_qtype_info *qqi;
174
175                 LDLM_DEBUG(lock, "canceling global quota lock");
176
177                 qqi = qsd_glb_ast_data_get(lock, true);
178                 if (qqi == NULL)
179                         break;
180
181                 /* we are losing the global index lock, so let's mark the
182                  * global & slave indexes as not up-to-date any more */
183                 write_lock(&qqi->qqi_qsd->qsd_lock);
184                 qqi->qqi_glb_uptodate = false;
185                 qqi->qqi_slv_uptodate = false;
186                 if (lock->l_handle.h_cookie == qqi->qqi_lockh.cookie)
187                         memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
188                 write_unlock(&qqi->qqi_qsd->qsd_lock);
189
190                 CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
191                        qqi->qqi_qsd->qsd_svname, QTYPE_NAME((qqi->qqi_qtype)));
192
193                 /* kick off reintegration thread if not running already, if
194                  * it's just local cancel (for stack clean up or eviction),
195                  * don't re-trigger the reintegration. */
196                 if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0)
197                         qsd_start_reint_thread(qqi);
198
199                 lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
200                 qqi_putref(qqi);
201                 break;
202         }
203         default:
204                 LASSERTF(0, "invalid flags for blocking ast %d", flag);
205         }
206
207         RETURN(rc);
208 }
209
210 /*
211  * Glimpse callback handler for global quota lock.
212  *
213  * \param lock - is the lock targeted by the glimpse
214  * \param data - is a pointer to the glimpse ptlrpc request
215  */
216 static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
217 {
218         struct ptlrpc_request           *req = data;
219         struct qsd_qtype_info           *qqi;
220         struct ldlm_gl_lquota_desc      *desc;
221         struct lquota_lvb               *lvb;
222         struct lquota_glb_rec            rec;
223         int                              rc;
224         ENTRY;
225
226         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
227         if (rc)
228                 GOTO(out, rc);
229
230         qqi = qsd_glb_ast_data_get(lock, false);
231         if (qqi == NULL)
232                 /* valid race */
233                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
234
235         CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
236                " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
237                desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
238                desc->gl_softlimit);
239
240         if (desc->gl_ver == 0) {
241                 CERROR("%s: invalid global index version "LPU64"\n",
242                        qqi->qqi_qsd->qsd_svname, desc->gl_ver);
243                 GOTO(out_qqi, rc = -EINVAL);
244         }
245
246         /* extract new hard & soft limits from the glimpse descriptor */
247         rec.qbr_hardlimit = desc->gl_hardlimit;
248         rec.qbr_softlimit = desc->gl_softlimit;
249         rec.qbr_time      = desc->gl_time;
250         rec.qbr_granted   = 0;
251
252         /* We can't afford disk io in the context of glimpse callback handling
253          * thread, so the on-disk global limits update has to be deferred. */
254         qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
255                          desc->gl_ver, true);
256         EXIT;
257 out_qqi:
258         lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
259         qqi_putref(qqi);
260 out:
261         req->rq_status = rc;
262         return rc;
263 }
264
265 struct ldlm_enqueue_info qsd_glb_einfo = { LDLM_PLAIN,
266                                            LCK_CR,
267                                            qsd_glb_blocking_ast,
268                                            ldlm_completion_ast,
269                                            qsd_glb_glimpse_ast,
270                                            NULL, NULL };
271 /*
272  * Blocking callback handler for per-ID lock
273  *
274  * \param lock - is the lock for which ast occurred.
275  * \param desc - is the description of a conflicting lock in case of blocking
276  *               ast.
277  * \param data - is the value of lock->l_ast_data
278  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
279  *               cancellation and blocking ast's.
280  */
281 static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
282                                void *data, int flag)
283 {
284         struct lustre_handle    lockh;
285         int                     rc = 0;
286         ENTRY;
287
288         switch(flag) {
289         case LDLM_CB_BLOCKING: {
290
291                 LDLM_DEBUG(lock, "blocking AST on ID quota lock");
292                 ldlm_lock2handle(lock, &lockh);
293                 rc = ldlm_cli_cancel(&lockh);
294                 break;
295         }
296         case LDLM_CB_CANCELING: {
297                 struct lu_env           *env;
298                 struct lquota_entry     *lqe;
299                 bool                     rel = false;
300
301                 LDLM_DEBUG(lock, "canceling global quota lock");
302                 lqe = qsd_id_ast_data_get(lock, true);
303                 if (lqe == NULL)
304                         break;
305
306                 LQUOTA_DEBUG(lqe, "losing ID lock");
307
308                 /* just local cancel (for stack clean up or eviction), don't
309                  * release quota space in this case */
310                 if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) {
311                         lqe_putref(lqe);
312                         break;
313                 }
314
315                 /* allocate environment */
316                 OBD_ALLOC_PTR(env);
317                 if (env == NULL) {
318                         lqe_putref(lqe);
319                         rc = -ENOMEM;
320                         break;
321                 }
322
323                 /* initialize environment */
324                 rc = lu_env_init(env, LCT_DT_THREAD);
325                 if (rc) {
326                         OBD_FREE_PTR(env);
327                         lqe_putref(lqe);
328                         break;
329                 }
330
331                 ldlm_lock2handle(lock, &lockh);
332                 lqe_write_lock(lqe);
333                 if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
334                         /* Clear lqe_lockh & reset qunit to 0 */
335                         qsd_set_qunit(lqe, 0);
336                         memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
337                         lqe->lqe_edquot = false;
338                         rel = true;
339                 }
340                 lqe_write_unlock(lqe);
341
342                 /* If there is qqacq inflight, the release will be skipped
343                  * at this time, and triggered on dqacq completion later,
344                  * which means there could be a short window that slave is
345                  * holding spare grant wihtout per-ID lock. */
346                 if (rel)
347                         rc = qsd_adjust(env, lqe);
348
349                 /* release lqe reference grabbed by qsd_id_ast_data_get() */
350                 lqe_putref(lqe);
351                 lu_env_fini(env);
352                 OBD_FREE_PTR(env);
353                 break;
354         }
355         default:
356                 LASSERTF(0, "invalid flags for blocking ast %d", flag);
357         }
358
359         RETURN(rc);
360 }
361
362 /*
363  * Glimpse callback handler for per-ID quota locks.
364  *
365  * \param lock - is the lock targeted by the glimpse
366  * \param data - is a pointer to the glimpse ptlrpc request
367  */
368 static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
369 {
370         struct ptlrpc_request           *req = data;
371         struct lquota_entry             *lqe;
372         struct qsd_instance             *qsd;
373         struct ldlm_gl_lquota_desc      *desc;
374         struct lquota_lvb               *lvb;
375         int                              rc;
376         bool                             wakeup = false;
377         ENTRY;
378
379         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
380         if (rc)
381                 GOTO(out, rc);
382
383         lqe = qsd_id_ast_data_get(lock, false);
384         if (lqe == NULL)
385                 /* valid race */
386                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
387
388         LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:"LPU64,
389                      desc->gl_qunit);
390
391         qsd = lqe2qqi(lqe)->qqi_qsd;
392
393         lqe_write_lock(lqe);
394         lvb->lvb_id_rel = 0;
395         if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
396                 long long space;
397
398                 /* extract new qunit from glimpse request */
399                 qsd_set_qunit(lqe, desc->gl_qunit);
400
401                 space  = lqe->lqe_granted - lqe->lqe_pending_rel;
402                 space -= lqe->lqe_usage;
403                 space -= lqe->lqe_pending_write + lqe->lqe_waiting_write;
404                 space -= lqe->lqe_qunit;
405
406                 if (space > 0) {
407                         if (lqe->lqe_pending_req > 0) {
408                                 LQUOTA_DEBUG(lqe, "request in flight, postpone "
409                                              "release of "LPD64, space);
410                                 lvb->lvb_id_may_rel = space;
411                         } else {
412                                 lqe->lqe_pending_req++;
413
414                                 /* release quota space in glimpse reply */
415                                 LQUOTA_DEBUG(lqe, "releasing "LPD64, space);
416                                 lqe->lqe_granted -= space;
417                                 lvb->lvb_id_rel   = space;
418
419                                 lqe_write_unlock(lqe);
420                                 /* change the lqe_granted */
421                                 qsd_upd_schedule(lqe2qqi(lqe), lqe, &lqe->lqe_id,
422                                                  (union lquota_rec *)&lqe->lqe_granted,
423                                                  0, false);
424                                 lqe_write_lock(lqe);
425
426                                 lqe->lqe_pending_req--;
427                                 wakeup = true;
428                         }
429                 }
430         }
431
432         lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
433         lqe_write_unlock(lqe);
434
435         if (wakeup)
436                 cfs_waitq_broadcast(&lqe->lqe_waiters);
437         lqe_putref(lqe);
438 out:
439         req->rq_status = rc;
440         RETURN(rc);
441 }
442
443 struct ldlm_enqueue_info qsd_id_einfo = { LDLM_PLAIN,
444                                           LCK_CR,
445                                           qsd_id_blocking_ast,
446                                           ldlm_completion_ast,
447                                           qsd_id_glimpse_ast,
448                                           NULL, NULL };
449
450 /*
451  * Check whether a slave already own a ldlm lock for the quota identifier \qid.
452  *
453  * \param lockh  - is the local lock handle from lquota entry.
454  * \param rlockh - is the remote lock handle of the matched lock, if any.
455  *
456  * \retval 0      : on successful look up and \lockh contains the lock handle.
457  * \retval -ENOENT: no lock found
458  */
459 int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
460 {
461         struct ldlm_lock        *lock;
462         int                      rc;
463         ENTRY;
464
465         LASSERT(lockh);
466
467         if (!lustre_handle_is_used(lockh))
468                 RETURN(-ENOENT);
469
470         rc = ldlm_lock_addref_try(lockh, qsd_id_einfo.ei_mode);
471         if (rc)
472                 RETURN(-ENOENT);
473
474         LASSERT(lustre_handle_is_used(lockh));
475         ldlm_lock_dump_handle(D_QUOTA, lockh);
476
477         if (rlockh == NULL)
478                 /* caller not interested in remote handle */
479                 RETURN(0);
480
481         /* look up lock associated with local handle and extract remote handle
482          * to be packed in quota request */
483         lock = ldlm_handle2lock(lockh);
484         LASSERT(lock != NULL);
485         lustre_handle_copy(rlockh, &lock->l_remote_handle);
486         LDLM_LOCK_PUT(lock);
487
488         RETURN(0);
489 }
490
491 int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
492 {
493         struct qsd_thread_info  *qti = qsd_info(env);
494         int                      rc;
495         ENTRY;
496
497         lqe_write_lock(lqe);
498         if (lqe->lqe_pending_write || lqe->lqe_waiting_write ||
499             lqe->lqe_usage || lqe->lqe_granted) {
500                 lqe_write_unlock(lqe);
501                 RETURN(0);
502         }
503
504         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
505         if (lustre_handle_is_used(&qti->qti_lockh)) {
506                 memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
507                 qsd_set_qunit(lqe, 0);
508                 lqe->lqe_edquot = false;
509         }
510         lqe_write_unlock(lqe);
511
512         rc = qsd_id_lock_match(&qti->qti_lockh, NULL);
513         if (rc)
514                 RETURN(rc);
515
516         ldlm_lock_decref_and_cancel(&qti->qti_lockh, qsd_id_einfo.ei_mode);
517         RETURN(0);
518 }