Whamcloud - gitweb
6fd2de044c68dffc69095540b869617ea14362f7
[fs/lustre-release.git] / lustre / quota / qsd_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include <lustre_dlm.h>
38 #include <obd_class.h>
39
40 #include "qsd_internal.h"
41
42 /*
43  * Return qsd_qtype_info structure associated with a global lock
44  *
45  * \param lock - is the global lock from which we should extract the qqi
46  * \param reset - whether lock->l_ast_data should be cleared
47  */
48 static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
49                                                    bool reset) {
50         struct qsd_qtype_info *qqi;
51         ENTRY;
52
53         lock_res_and_lock(lock);
54         qqi = lock->l_ast_data;
55         if (qqi != NULL) {
56                 qqi_getref(qqi);
57                 lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
58                 if (reset)
59                         lock->l_ast_data = NULL;
60         }
61         unlock_res_and_lock(lock);
62
63         if (reset && qqi != NULL) {
64                 /* release qqi reference hold for the lock */
65                 qqi_putref(qqi);
66                 lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
67         }
68         RETURN(qqi);
69 }
70
71 /*
72  * Return lquota entry structure associated with a per-ID lock
73  *
74  * \param lock - is the per-ID lock from which we should extract the lquota
75  *               entry
76  * \param reset - whether lock->l_ast_data should be cleared
77  */
78 static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
79                                                 bool reset) {
80         struct lquota_entry *lqe;
81         ENTRY;
82
83         lock_res_and_lock(lock);
84         lqe = lock->l_ast_data;
85         if (lqe != NULL) {
86                 lqe_getref(lqe);
87                 if (reset)
88                         lock->l_ast_data = NULL;
89         }
90         unlock_res_and_lock(lock);
91
92         if (reset && lqe != NULL)
93                 /* release lqe reference hold for the lock */
94                 lqe_putref(lqe);
95         RETURN(lqe);
96 }
97
98 /*
99  * Glimpse callback handler for all quota locks. This function extracts
100  * information from the glimpse request.
101  *
102  * \param lock - is the lock targeted by the glimpse
103  * \param data - is a pointer to the glimpse ptlrpc request
104  * \param req  - is the glimpse request
105  * \param desc - is the glimpse descriptor describing the purpose of the glimpse
106  *               request.
107  * \param lvb  - is the pointer to the lvb in the reply buffer
108  *
109  * \retval 0 on success and \desc, \lvb & \arg point to a valid structures,
110  *         appropriate error on failure
111  */
112 static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
113                                   struct ldlm_gl_lquota_desc **desc, void **lvb)
114 {
115         int rc;
116         ENTRY;
117
118         LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
119
120         /* glimpse on quota locks always packs a glimpse descriptor */
121         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_DESC_CALLBACK);
122
123         /* extract glimpse descriptor */
124         *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
125         if (*desc == NULL)
126                 RETURN(-EFAULT);
127
128         /* prepare reply */
129         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
130                              sizeof(struct lquota_lvb));
131         rc = req_capsule_server_pack(&req->rq_pill);
132         if (rc != 0) {
133                 CERROR("Can't pack response, rc %d\n", rc);
134                 RETURN(rc);
135         }
136
137         /* extract lvb */
138         *lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
139
140         RETURN(0);
141 }
142
143 /*
144  * Blocking callback handler for global index lock
145  *
146  * \param lock - is the lock for which ast occurred.
147  * \param desc - is the description of a conflicting lock in case of blocking
148  *               ast.
149  * \param data - is the value of lock->l_ast_data
150  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
151  *               cancellation and blocking ast's.
152  */
153 static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
154                                 struct ldlm_lock_desc *desc, void *data,
155                                 int flag)
156 {
157         int rc = 0;
158         ENTRY;
159
160         switch(flag) {
161         case LDLM_CB_BLOCKING: {
162                 struct lustre_handle lockh;
163
164                 LDLM_DEBUG(lock, "blocking AST on global quota lock");
165                 ldlm_lock2handle(lock, &lockh);
166                 rc = ldlm_cli_cancel(&lockh);
167                 break;
168         }
169         case LDLM_CB_CANCELING: {
170                 struct qsd_qtype_info *qqi;
171
172                 LDLM_DEBUG(lock, "canceling global quota lock");
173
174                 qqi = qsd_glb_ast_data_get(lock, true);
175                 if (qqi == NULL)
176                         break;
177
178                 /* we are losing the global index lock, so let's mark the
179                  * global & slave indexes as not up-to-date any more */
180                 write_lock(&qqi->qqi_qsd->qsd_lock);
181                 qqi->qqi_glb_uptodate = false;
182                 qqi->qqi_slv_uptodate = false;
183                 if (lock->l_handle.h_cookie == qqi->qqi_lockh.cookie)
184                         memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
185                 write_unlock(&qqi->qqi_qsd->qsd_lock);
186
187                 CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
188                        qqi->qqi_qsd->qsd_svname, QTYPE_NAME((qqi->qqi_qtype)));
189
190                 /* kick off reintegration thread if not running already, if
191                  * it's just local cancel (for stack clean up or eviction),
192                  * don't re-trigger the reintegration. */
193                 if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0)
194                         qsd_start_reint_thread(qqi);
195
196                 lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
197                 qqi_putref(qqi);
198                 break;
199         }
200         default:
201                 LASSERTF(0, "invalid flags for blocking ast %d", flag);
202         }
203
204         RETURN(rc);
205 }
206
207 /*
208  * Glimpse callback handler for global quota lock.
209  *
210  * \param lock - is the lock targeted by the glimpse
211  * \param data - is a pointer to the glimpse ptlrpc request
212  */
213 static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
214 {
215         struct ptlrpc_request           *req = data;
216         struct qsd_qtype_info           *qqi;
217         struct ldlm_gl_lquota_desc      *desc;
218         struct lquota_lvb               *lvb;
219         struct lquota_glb_rec            rec;
220         int                              rc;
221         ENTRY;
222
223         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
224         if (rc)
225                 GOTO(out, rc);
226
227         qqi = qsd_glb_ast_data_get(lock, false);
228         if (qqi == NULL)
229                 /* valid race */
230                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
231
232         CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
233                " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
234                desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
235                desc->gl_softlimit);
236
237         if (desc->gl_ver == 0) {
238                 CERROR("%s: invalid global index version "LPU64"\n",
239                        qqi->qqi_qsd->qsd_svname, desc->gl_ver);
240                 GOTO(out_qqi, rc = -EINVAL);
241         }
242
243         /* extract new hard & soft limits from the glimpse descriptor */
244         rec.qbr_hardlimit = desc->gl_hardlimit;
245         rec.qbr_softlimit = desc->gl_softlimit;
246         rec.qbr_time      = desc->gl_time;
247         rec.qbr_granted   = 0;
248
249         /* We can't afford disk io in the context of glimpse callback handling
250          * thread, so the on-disk global limits update has to be deferred. */
251         qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
252                          desc->gl_ver, true);
253         EXIT;
254 out_qqi:
255         lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
256         qqi_putref(qqi);
257 out:
258         req->rq_status = rc;
259         return rc;
260 }
261
262 struct ldlm_enqueue_info qsd_glb_einfo = { LDLM_PLAIN,
263                                            LCK_CR,
264                                            qsd_glb_blocking_ast,
265                                            ldlm_completion_ast,
266                                            qsd_glb_glimpse_ast,
267                                            NULL, NULL };
268 /*
269  * Blocking callback handler for per-ID lock
270  *
271  * \param lock - is the lock for which ast occurred.
272  * \param desc - is the description of a conflicting lock in case of blocking
273  *               ast.
274  * \param data - is the value of lock->l_ast_data
275  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
276  *               cancellation and blocking ast's.
277  */
278 static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
279                                void *data, int flag)
280 {
281         struct lustre_handle    lockh;
282         int                     rc = 0;
283         ENTRY;
284
285         switch(flag) {
286         case LDLM_CB_BLOCKING: {
287
288                 LDLM_DEBUG(lock, "blocking AST on ID quota lock");
289                 ldlm_lock2handle(lock, &lockh);
290                 rc = ldlm_cli_cancel(&lockh);
291                 break;
292         }
293         case LDLM_CB_CANCELING: {
294                 struct lu_env           *env;
295                 struct lquota_entry     *lqe;
296                 bool                     rel = false;
297
298                 LDLM_DEBUG(lock, "canceling global quota lock");
299                 lqe = qsd_id_ast_data_get(lock, true);
300                 if (lqe == NULL)
301                         break;
302
303                 LQUOTA_DEBUG(lqe, "losing ID lock");
304
305                 /* just local cancel (for stack clean up or eviction), don't
306                  * release quota space in this case */
307                 if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) {
308                         lqe_putref(lqe);
309                         break;
310                 }
311
312                 /* allocate environment */
313                 OBD_ALLOC_PTR(env);
314                 if (env == NULL) {
315                         lqe_putref(lqe);
316                         rc = -ENOMEM;
317                         break;
318                 }
319
320                 /* initialize environment */
321                 rc = lu_env_init(env, LCT_DT_THREAD);
322                 if (rc) {
323                         OBD_FREE_PTR(env);
324                         lqe_putref(lqe);
325                         break;
326                 }
327
328                 ldlm_lock2handle(lock, &lockh);
329                 lqe_write_lock(lqe);
330                 if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
331                         /* Clear lqe_lockh & reset qunit to 0 */
332                         qsd_set_qunit(lqe, 0);
333                         memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
334                         lqe->lqe_edquot = false;
335                         rel = true;
336                 }
337                 lqe_write_unlock(lqe);
338
339                 /* If there is qqacq inflight, the release will be skipped
340                  * at this time, and triggered on dqacq completion later,
341                  * which means there could be a short window that slave is
342                  * holding spare grant wihtout per-ID lock. */
343                 if (rel)
344                         rc = qsd_adjust(env, lqe);
345
346                 /* release lqe reference grabbed by qsd_id_ast_data_get() */
347                 lqe_putref(lqe);
348                 lu_env_fini(env);
349                 OBD_FREE_PTR(env);
350                 break;
351         }
352         default:
353                 LASSERTF(0, "invalid flags for blocking ast %d", flag);
354         }
355
356         RETURN(rc);
357 }
358
359 /*
360  * Glimpse callback handler for per-ID quota locks.
361  *
362  * \param lock - is the lock targeted by the glimpse
363  * \param data - is a pointer to the glimpse ptlrpc request
364  */
365 static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
366 {
367         struct ptlrpc_request           *req = data;
368         struct lquota_entry             *lqe;
369         struct qsd_instance             *qsd;
370         struct ldlm_gl_lquota_desc      *desc;
371         struct lquota_lvb               *lvb;
372         int                              rc;
373         bool                             wakeup = false;
374         ENTRY;
375
376         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
377         if (rc)
378                 GOTO(out, rc);
379
380         lqe = qsd_id_ast_data_get(lock, false);
381         if (lqe == NULL)
382                 /* valid race */
383                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
384
385         LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:"LPU64,
386                      desc->gl_qunit);
387
388         qsd = lqe2qqi(lqe)->qqi_qsd;
389
390         lqe_write_lock(lqe);
391         lvb->lvb_id_rel = 0;
392         if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
393                 long long space;
394
395                 /* extract new qunit from glimpse request */
396                 qsd_set_qunit(lqe, desc->gl_qunit);
397
398                 space  = lqe->lqe_granted - lqe->lqe_pending_rel;
399                 space -= lqe->lqe_usage;
400                 space -= lqe->lqe_pending_write + lqe->lqe_waiting_write;
401                 space -= lqe->lqe_qunit;
402
403                 if (space > 0) {
404                         if (lqe->lqe_pending_req > 0) {
405                                 LQUOTA_DEBUG(lqe, "request in flight, postpone "
406                                              "release of "LPD64, space);
407                                 lvb->lvb_id_may_rel = space;
408                         } else {
409                                 lqe->lqe_pending_req++;
410
411                                 /* release quota space in glimpse reply */
412                                 LQUOTA_DEBUG(lqe, "releasing "LPD64, space);
413                                 lqe->lqe_granted -= space;
414                                 lvb->lvb_id_rel   = space;
415
416                                 lqe_write_unlock(lqe);
417                                 /* change the lqe_granted */
418                                 qsd_upd_schedule(lqe2qqi(lqe), lqe, &lqe->lqe_id,
419                                                  (union lquota_rec *)&lqe->lqe_granted,
420                                                  0, false);
421                                 lqe_write_lock(lqe);
422
423                                 lqe->lqe_pending_req--;
424                                 wakeup = true;
425                         }
426                 }
427         }
428
429         lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
430         lqe_write_unlock(lqe);
431
432         if (wakeup)
433                 cfs_waitq_broadcast(&lqe->lqe_waiters);
434         lqe_putref(lqe);
435 out:
436         req->rq_status = rc;
437         RETURN(rc);
438 }
439
440 struct ldlm_enqueue_info qsd_id_einfo = { LDLM_PLAIN,
441                                           LCK_CR,
442                                           qsd_id_blocking_ast,
443                                           ldlm_completion_ast,
444                                           qsd_id_glimpse_ast,
445                                           NULL, NULL };
446
447 /*
448  * Check whether a slave already own a ldlm lock for the quota identifier \qid.
449  *
450  * \param lockh  - is the local lock handle from lquota entry.
451  * \param rlockh - is the remote lock handle of the matched lock, if any.
452  *
453  * \retval 0      : on successful look up and \lockh contains the lock handle.
454  * \retval -ENOENT: no lock found
455  */
456 int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
457 {
458         struct ldlm_lock        *lock;
459         int                      rc;
460         ENTRY;
461
462         LASSERT(lockh);
463
464         if (!lustre_handle_is_used(lockh))
465                 RETURN(-ENOENT);
466
467         rc = ldlm_lock_addref_try(lockh, qsd_id_einfo.ei_mode);
468         if (rc)
469                 RETURN(-ENOENT);
470
471         LASSERT(lustre_handle_is_used(lockh));
472         ldlm_lock_dump_handle(D_QUOTA, lockh);
473
474         if (rlockh == NULL)
475                 RETURN(0);
476
477         /* look up lock associated with local handle and extract remote handle
478          * to be packed in quota request */
479         lock = ldlm_handle2lock(lockh);
480         LASSERT(lock != NULL);
481         lustre_handle_copy(rlockh, &lock->l_remote_handle);
482         LDLM_LOCK_PUT(lock);
483
484         RETURN(0);
485 }
486
487 int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
488 {
489         struct qsd_thread_info  *qti = qsd_info(env);
490         int                      rc;
491         ENTRY;
492
493         lqe_write_lock(lqe);
494         if (lqe->lqe_pending_write || lqe->lqe_waiting_write ||
495             lqe->lqe_usage || lqe->lqe_granted) {
496                 lqe_write_unlock(lqe);
497                 RETURN(0);
498         }
499
500         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
501         if (lustre_handle_is_used(&qti->qti_lockh)) {
502                 memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
503                 qsd_set_qunit(lqe, 0);
504                 lqe->lqe_edquot = false;
505         }
506         lqe_write_unlock(lqe);
507
508         rc = qsd_id_lock_match(&qti->qti_lockh, NULL);
509         if (rc)
510                 RETURN(rc);
511
512         ldlm_lock_decref_and_cancel(&qti->qti_lockh, qsd_id_einfo.ei_mode);
513         RETURN(0);
514 }