Whamcloud - gitweb
b=18364
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         lustre_set_wire_obdo(&body->oa, oa);
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         lustre_get_wire_obdo(oa, &body->oa);
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         lustre_get_wire_obdo(aa->pa_oa, &body->oa);
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         lustre_set_wire_obdo(&body->oa, oa);
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         lustre_set_wire_obdo(&body->oa, oa);
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         lustre_get_wire_obdo(oa, &body->oa);
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         lustre_set_wire_obdo(&body->oa, oa);
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = { 0 };
747
748                         /*
749                          * Wait until the number of on-going destroy RPCs drops
750                          * under max_rpc_in_flight
751                          */
752                         l_wait_event_exclusive(cli->cl_destroy_waitq,
753                                                osc_can_send_destroy(cli), &lwi);
754                 }
755         }
756
757         /* Do not wait for response */
758         ptlrpcd_add_req(req, PSCOPE_OTHER);
759         RETURN(0);
760 }
761
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
763                                 long writing_bytes)
764 {
765         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
766
767         LASSERT(!(oa->o_valid & bits));
768
769         oa->o_valid |= bits;
770         client_obd_list_lock(&cli->cl_loi_list_lock);
771         oa->o_dirty = cli->cl_dirty;
772         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
775                 oa->o_undirty = 0;
776         } else if (atomic_read(&obd_dirty_pages) -
777                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778                 CERROR("dirty %d - %d > system dirty_max %d\n",
779                        atomic_read(&obd_dirty_pages),
780                        atomic_read(&obd_dirty_transit_pages),
781                        obd_max_dirty_pages);
782                 oa->o_undirty = 0;
783         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784                 CERROR("dirty %lu - dirty_max %lu too big???\n",
785                        cli->cl_dirty, cli->cl_dirty_max);
786                 oa->o_undirty = 0;
787         } else {
788                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789                                 (cli->cl_max_rpcs_in_flight + 1);
790                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791         }
792         oa->o_grant = cli->cl_avail_grant;
793         oa->o_dropped = cli->cl_lost_grant;
794         cli->cl_lost_grant = 0;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
798
799 }
800
801 static void osc_update_next_shrink(struct client_obd *cli)
802 {
803         cli->cl_next_shrink_grant =
804                 cfs_time_shift(cli->cl_grant_shrink_interval);
805         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
806                cli->cl_next_shrink_grant);
807 }
808
809 /* caller must hold loi_list_lock */
810 static void osc_consume_write_grant(struct client_obd *cli,
811                                     struct brw_page *pga)
812 {
813         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
814         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
815         atomic_inc(&obd_dirty_pages);
816         cli->cl_dirty += CFS_PAGE_SIZE;
817         cli->cl_avail_grant -= CFS_PAGE_SIZE;
818         pga->flag |= OBD_BRW_FROM_GRANT;
819         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
820                CFS_PAGE_SIZE, pga, pga->pg);
821         LASSERT(cli->cl_avail_grant >= 0);
822         osc_update_next_shrink(cli);
823 }
824
825 /* the companion to osc_consume_write_grant, called when a brw has completed.
826  * must be called with the loi lock held. */
827 static void osc_release_write_grant(struct client_obd *cli,
828                                     struct brw_page *pga, int sent)
829 {
830         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
831         ENTRY;
832
833         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
834         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
835                 EXIT;
836                 return;
837         }
838
839         pga->flag &= ~OBD_BRW_FROM_GRANT;
840         atomic_dec(&obd_dirty_pages);
841         cli->cl_dirty -= CFS_PAGE_SIZE;
842         if (pga->flag & OBD_BRW_NOCACHE) {
843                 pga->flag &= ~OBD_BRW_NOCACHE;
844                 atomic_dec(&obd_dirty_transit_pages);
845                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
846         }
847         if (!sent) {
848                 cli->cl_lost_grant += CFS_PAGE_SIZE;
849                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
850                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
851         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
852                 /* For short writes we shouldn't count parts of pages that
853                  * span a whole block on the OST side, or our accounting goes
854                  * wrong.  Should match the code in filter_grant_check. */
855                 int offset = pga->off & ~CFS_PAGE_MASK;
856                 int count = pga->count + (offset & (blocksize - 1));
857                 int end = (offset + pga->count) & (blocksize - 1);
858                 if (end)
859                         count += blocksize - end;
860
861                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
862                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
863                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
864                        cli->cl_avail_grant, cli->cl_dirty);
865         }
866
867         EXIT;
868 }
869
870 static unsigned long rpcs_in_flight(struct client_obd *cli)
871 {
872         return cli->cl_r_in_flight + cli->cl_w_in_flight;
873 }
874
875 /* caller must hold loi_list_lock */
876 void osc_wake_cache_waiters(struct client_obd *cli)
877 {
878         struct list_head *l, *tmp;
879         struct osc_cache_waiter *ocw;
880
881         ENTRY;
882         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
883                 /* if we can't dirty more, we must wait until some is written */
884                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
885                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
886                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
887                                "osc max %ld, sys max %d\n", cli->cl_dirty,
888                                cli->cl_dirty_max, obd_max_dirty_pages);
889                         return;
890                 }
891
892                 /* if still dirty cache but no grant wait for pending RPCs that
893                  * may yet return us some grant before doing sync writes */
894                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
895                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
896                                cli->cl_w_in_flight);
897                         return;
898                 }
899
900                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
901                 list_del_init(&ocw->ocw_entry);
902                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
903                         /* no more RPCs in flight to return grant, do sync IO */
904                         ocw->ocw_rc = -EDQUOT;
905                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
906                 } else {
907                         osc_consume_write_grant(cli,
908                                                 &ocw->ocw_oap->oap_brw_page);
909                 }
910
911                 cfs_waitq_signal(&ocw->ocw_waitq);
912         }
913
914         EXIT;
915 }
916
917 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
918 {
919         client_obd_list_lock(&cli->cl_loi_list_lock);
920         cli->cl_avail_grant += grant;
921         client_obd_list_unlock(&cli->cl_loi_list_lock);
922 }
923
924 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
925 {
926         if (body->oa.o_valid & OBD_MD_FLGRANT) {
927                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
928                 __osc_update_grant(cli, body->oa.o_grant);
929         }
930 }
931
932 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
933                               void *key, obd_count vallen, void *val,
934                               struct ptlrpc_request_set *set);
935
936 static int osc_shrink_grant_interpret(const struct lu_env *env,
937                                       struct ptlrpc_request *req,
938                                       void *aa, int rc)
939 {
940         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
941         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
942         struct ost_body *body;
943
944         if (rc != 0) {
945                 __osc_update_grant(cli, oa->o_grant);
946                 GOTO(out, rc);
947         }
948
949         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
950         LASSERT(body);
951         osc_update_grant(cli, body);
952 out:
953         OBD_FREE_PTR(oa);
954         return rc;
955 }
956
957 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
958 {
959         client_obd_list_lock(&cli->cl_loi_list_lock);
960         oa->o_grant = cli->cl_avail_grant / 4;
961         cli->cl_avail_grant -= oa->o_grant;
962         client_obd_list_unlock(&cli->cl_loi_list_lock);
963         oa->o_flags |= OBD_FL_SHRINK_GRANT;
964         osc_update_next_shrink(cli);
965 }
966
967 /* Shrink the current grant, either from some large amount to enough for a
968  * full set of in-flight RPCs, or if we have already shrunk to that limit
969  * then to enough for a single RPC.  This avoids keeping more grant than
970  * needed, and avoids shrinking the grant piecemeal. */
971 static int osc_shrink_grant(struct client_obd *cli)
972 {
973         long target = (cli->cl_max_rpcs_in_flight + 1) *
974                       cli->cl_max_pages_per_rpc;
975
976         client_obd_list_lock(&cli->cl_loi_list_lock);
977         if (cli->cl_avail_grant <= target)
978                 target = cli->cl_max_pages_per_rpc;
979         client_obd_list_unlock(&cli->cl_loi_list_lock);
980
981         return osc_shrink_grant_to_target(cli, target);
982 }
983
984 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
985 {
986         int    rc = 0;
987         struct ost_body     *body;
988         ENTRY;
989
990         client_obd_list_lock(&cli->cl_loi_list_lock);
991         /* Don't shrink if we are already above or below the desired limit
992          * We don't want to shrink below a single RPC, as that will negatively
993          * impact block allocation and long-term performance. */
994         if (target < cli->cl_max_pages_per_rpc)
995                 target = cli->cl_max_pages_per_rpc;
996
997         if (target >= cli->cl_avail_grant) {
998                 client_obd_list_unlock(&cli->cl_loi_list_lock);
999                 RETURN(0);
1000         }
1001         client_obd_list_unlock(&cli->cl_loi_list_lock);
1002
1003         OBD_ALLOC_PTR(body);
1004         if (!body)
1005                 RETURN(-ENOMEM);
1006
1007         osc_announce_cached(cli, &body->oa, 0);
1008
1009         client_obd_list_lock(&cli->cl_loi_list_lock);
1010         body->oa.o_grant = cli->cl_avail_grant - target;
1011         cli->cl_avail_grant = target;
1012         client_obd_list_unlock(&cli->cl_loi_list_lock);
1013         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1014         osc_update_next_shrink(cli);
1015
1016         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1017                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1018                                 sizeof(*body), body, NULL);
1019         if (rc != 0)
1020                 __osc_update_grant(cli, body->oa.o_grant);
1021         OBD_FREE_PTR(body);
1022         RETURN(rc);
1023 }
1024
1025 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1026 static int osc_should_shrink_grant(struct client_obd *client)
1027 {
1028         cfs_time_t time = cfs_time_current();
1029         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1030         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1031                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1032                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1033                         return 1;
1034                 else
1035                         osc_update_next_shrink(client);
1036         }
1037         return 0;
1038 }
1039
1040 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1041 {
1042         struct client_obd *client;
1043
1044         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1045                 if (osc_should_shrink_grant(client))
1046                         osc_shrink_grant(client);
1047         }
1048         return 0;
1049 }
1050
1051 static int osc_add_shrink_grant(struct client_obd *client)
1052 {
1053         int rc;
1054
1055         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1056                                        TIMEOUT_GRANT,
1057                                        osc_grant_shrink_grant_cb, NULL,
1058                                        &client->cl_grant_shrink_list);
1059         if (rc) {
1060                 CERROR("add grant client %s error %d\n",
1061                         client->cl_import->imp_obd->obd_name, rc);
1062                 return rc;
1063         }
1064         CDEBUG(D_CACHE, "add grant client %s \n",
1065                client->cl_import->imp_obd->obd_name);
1066         osc_update_next_shrink(client);
1067         return 0;
1068 }
1069
1070 static int osc_del_shrink_grant(struct client_obd *client)
1071 {
1072         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1073                                          TIMEOUT_GRANT);
1074 }
1075
1076 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1077 {
1078         client_obd_list_lock(&cli->cl_loi_list_lock);
1079         cli->cl_avail_grant = ocd->ocd_grant;
1080         client_obd_list_unlock(&cli->cl_loi_list_lock);
1081
1082         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1083             list_empty(&cli->cl_grant_shrink_list))
1084                 osc_add_shrink_grant(cli);
1085
1086         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1087                cli->cl_avail_grant, cli->cl_lost_grant);
1088         LASSERT(cli->cl_avail_grant >= 0);
1089 }
1090
1091 /* We assume that the reason this OSC got a short read is because it read
1092  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1093  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1094  * this stripe never got written at or beyond this stripe offset yet. */
1095 static void handle_short_read(int nob_read, obd_count page_count,
1096                               struct brw_page **pga)
1097 {
1098         char *ptr;
1099         int i = 0;
1100
1101         /* skip bytes read OK */
1102         while (nob_read > 0) {
1103                 LASSERT (page_count > 0);
1104
1105                 if (pga[i]->count > nob_read) {
1106                         /* EOF inside this page */
1107                         ptr = cfs_kmap(pga[i]->pg) +
1108                                 (pga[i]->off & ~CFS_PAGE_MASK);
1109                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1110                         cfs_kunmap(pga[i]->pg);
1111                         page_count--;
1112                         i++;
1113                         break;
1114                 }
1115
1116                 nob_read -= pga[i]->count;
1117                 page_count--;
1118                 i++;
1119         }
1120
1121         /* zero remaining pages */
1122         while (page_count-- > 0) {
1123                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1124                 memset(ptr, 0, pga[i]->count);
1125                 cfs_kunmap(pga[i]->pg);
1126                 i++;
1127         }
1128 }
1129
1130 static int check_write_rcs(struct ptlrpc_request *req,
1131                            int requested_nob, int niocount,
1132                            obd_count page_count, struct brw_page **pga)
1133 {
1134         int    *remote_rcs, i;
1135
1136         /* return error if any niobuf was in error */
1137         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1138                                         sizeof(*remote_rcs) * niocount, NULL);
1139         if (remote_rcs == NULL) {
1140                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1141                 return(-EPROTO);
1142         }
1143         if (lustre_msg_swabbed(req->rq_repmsg))
1144                 for (i = 0; i < niocount; i++)
1145                         __swab32s(&remote_rcs[i]);
1146
1147         for (i = 0; i < niocount; i++) {
1148                 if (remote_rcs[i] < 0)
1149                         return(remote_rcs[i]);
1150
1151                 if (remote_rcs[i] != 0) {
1152                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1153                                 i, remote_rcs[i], req);
1154                         return(-EPROTO);
1155                 }
1156         }
1157
1158         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1159                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1160                        req->rq_bulk->bd_nob_transferred, requested_nob);
1161                 return(-EPROTO);
1162         }
1163
1164         return (0);
1165 }
1166
1167 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1168 {
1169         if (p1->flag != p2->flag) {
1170                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1171                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1172
1173                 /* warn if we try to combine flags that we don't know to be
1174                  * safe to combine */
1175                 if ((p1->flag & mask) != (p2->flag & mask))
1176                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1177                                "same brw?\n", p1->flag, p2->flag);
1178                 return 0;
1179         }
1180
1181         return (p1->off + p1->count == p2->off);
1182 }
1183
1184 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1185                                    struct brw_page **pga, int opc,
1186                                    cksum_type_t cksum_type)
1187 {
1188         __u32 cksum;
1189         int i = 0;
1190
1191         LASSERT (pg_count > 0);
1192         cksum = init_checksum(cksum_type);
1193         while (nob > 0 && pg_count > 0) {
1194                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1195                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1196                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1197
1198                 /* corrupt the data before we compute the checksum, to
1199                  * simulate an OST->client data error */
1200                 if (i == 0 && opc == OST_READ &&
1201                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1202                         memcpy(ptr + off, "bad1", min(4, nob));
1203                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1204                 cfs_kunmap(pga[i]->pg);
1205                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1206                                off, cksum);
1207
1208                 nob -= pga[i]->count;
1209                 pg_count--;
1210                 i++;
1211         }
1212         /* For sending we only compute the wrong checksum instead
1213          * of corrupting the data so it is still correct on a redo */
1214         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1215                 cksum++;
1216
1217         return cksum;
1218 }
1219
1220 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1221                                 struct lov_stripe_md *lsm, obd_count page_count,
1222                                 struct brw_page **pga,
1223                                 struct ptlrpc_request **reqp,
1224                                 struct obd_capa *ocapa, int reserve)
1225 {
1226         struct ptlrpc_request   *req;
1227         struct ptlrpc_bulk_desc *desc;
1228         struct ost_body         *body;
1229         struct obd_ioobj        *ioobj;
1230         struct niobuf_remote    *niobuf;
1231         int niocount, i, requested_nob, opc, rc;
1232         struct osc_brw_async_args *aa;
1233         struct req_capsule      *pill;
1234         struct brw_page *pg_prev;
1235
1236         ENTRY;
1237         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1238                 RETURN(-ENOMEM); /* Recoverable */
1239         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1240                 RETURN(-EINVAL); /* Fatal */
1241
1242         if ((cmd & OBD_BRW_WRITE) != 0) {
1243                 opc = OST_WRITE;
1244                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1245                                                 cli->cl_import->imp_rq_pool,
1246                                                 &RQF_OST_BRW);
1247         } else {
1248                 opc = OST_READ;
1249                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1250         }
1251         if (req == NULL)
1252                 RETURN(-ENOMEM);
1253
1254         for (niocount = i = 1; i < page_count; i++) {
1255                 if (!can_merge_pages(pga[i - 1], pga[i]))
1256                         niocount++;
1257         }
1258
1259         pill = &req->rq_pill;
1260         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1261                              niocount * sizeof(*niobuf));
1262         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1263
1264         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1265         if (rc) {
1266                 ptlrpc_request_free(req);
1267                 RETURN(rc);
1268         }
1269         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1270         ptlrpc_at_set_req_timeout(req);
1271
1272         if (opc == OST_WRITE)
1273                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1274                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1275         else
1276                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1277                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1278
1279         if (desc == NULL)
1280                 GOTO(out, rc = -ENOMEM);
1281         /* NB request now owns desc and will free it when it gets freed */
1282
1283         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1284         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1285         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1286         LASSERT(body && ioobj && niobuf);
1287
1288         lustre_set_wire_obdo(&body->oa, oa);
1289
1290         obdo_to_ioobj(oa, ioobj);
1291         ioobj->ioo_bufcnt = niocount;
1292         osc_pack_capa(req, body, ocapa);
1293         LASSERT (page_count > 0);
1294         pg_prev = pga[0];
1295         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1296                 struct brw_page *pg = pga[i];
1297
1298                 LASSERT(pg->count > 0);
1299                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1300                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1301                          pg->off, pg->count);
1302 #ifdef __linux__
1303                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1304                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1305                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1306                          i, page_count,
1307                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1308                          pg_prev->pg, page_private(pg_prev->pg),
1309                          pg_prev->pg->index, pg_prev->off);
1310 #else
1311                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1312                          "i %d p_c %u\n", i, page_count);
1313 #endif
1314                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1315                         (pg->flag & OBD_BRW_SRVLOCK));
1316
1317                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1318                                       pg->count);
1319                 requested_nob += pg->count;
1320
1321                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1322                         niobuf--;
1323                         niobuf->len += pg->count;
1324                 } else {
1325                         niobuf->offset = pg->off;
1326                         niobuf->len    = pg->count;
1327                         niobuf->flags  = pg->flag;
1328                 }
1329                 pg_prev = pg;
1330         }
1331
1332         LASSERTF((void *)(niobuf - niocount) ==
1333                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1334                                niocount * sizeof(*niobuf)),
1335                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1336                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1337                 (void *)(niobuf - niocount));
1338
1339         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1340         if (osc_should_shrink_grant(cli))
1341                 osc_shrink_grant_local(cli, &body->oa);
1342
1343         /* size[REQ_REC_OFF] still sizeof (*body) */
1344         if (opc == OST_WRITE) {
1345                 if (unlikely(cli->cl_checksum) &&
1346                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1347                         /* store cl_cksum_type in a local variable since
1348                          * it can be changed via lprocfs */
1349                         cksum_type_t cksum_type = cli->cl_cksum_type;
1350
1351                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1352                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1353                                 body->oa.o_flags = 0;
1354                         }
1355                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1356                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1357                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1358                                                              page_count, pga,
1359                                                              OST_WRITE,
1360                                                              cksum_type);
1361                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1362                                body->oa.o_cksum);
1363                         /* save this in 'oa', too, for later checking */
1364                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1365                         oa->o_flags |= cksum_type_pack(cksum_type);
1366                 } else {
1367                         /* clear out the checksum flag, in case this is a
1368                          * resend but cl_checksum is no longer set. b=11238 */
1369                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1370                 }
1371                 oa->o_cksum = body->oa.o_cksum;
1372                 /* 1 RC per niobuf */
1373                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1374                                      sizeof(__u32) * niocount);
1375         } else {
1376                 if (unlikely(cli->cl_checksum) &&
1377                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1378                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1379                                 body->oa.o_flags = 0;
1380                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1381                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1382                 }
1383                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1384                 /* 1 RC for the whole I/O */
1385         }
1386         ptlrpc_request_set_replen(req);
1387
1388         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1389         aa = ptlrpc_req_async_args(req);
1390         aa->aa_oa = oa;
1391         aa->aa_requested_nob = requested_nob;
1392         aa->aa_nio_count = niocount;
1393         aa->aa_page_count = page_count;
1394         aa->aa_resends = 0;
1395         aa->aa_ppga = pga;
1396         aa->aa_cli = cli;
1397         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1398         if (ocapa && reserve)
1399                 aa->aa_ocapa = capa_get(ocapa);
1400
1401         *reqp = req;
1402         RETURN(0);
1403
1404  out:
1405         ptlrpc_req_finished(req);
1406         RETURN(rc);
1407 }
1408
1409 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1410                                 __u32 client_cksum, __u32 server_cksum, int nob,
1411                                 obd_count page_count, struct brw_page **pga,
1412                                 cksum_type_t client_cksum_type)
1413 {
1414         __u32 new_cksum;
1415         char *msg;
1416         cksum_type_t cksum_type;
1417
1418         if (server_cksum == client_cksum) {
1419                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1420                 return 0;
1421         }
1422
1423         if (oa->o_valid & OBD_MD_FLFLAGS)
1424                 cksum_type = cksum_type_unpack(oa->o_flags);
1425         else
1426                 cksum_type = OBD_CKSUM_CRC32;
1427
1428         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1429                                       cksum_type);
1430
1431         if (cksum_type != client_cksum_type)
1432                 msg = "the server did not use the checksum type specified in "
1433                       "the original request - likely a protocol problem";
1434         else if (new_cksum == server_cksum)
1435                 msg = "changed on the client after we checksummed it - "
1436                       "likely false positive due to mmap IO (bug 11742)";
1437         else if (new_cksum == client_cksum)
1438                 msg = "changed in transit before arrival at OST";
1439         else
1440                 msg = "changed in transit AND doesn't match the original - "
1441                       "likely false positive due to mmap IO (bug 11742)";
1442
1443         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1444                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1445                            "["LPU64"-"LPU64"]\n",
1446                            msg, libcfs_nid2str(peer->nid),
1447                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1448                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1449                                                         (__u64)0,
1450                            oa->o_id,
1451                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1452                            pga[0]->off,
1453                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1454         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1455                "client csum now %x\n", client_cksum, client_cksum_type,
1456                server_cksum, cksum_type, new_cksum);
1457         return 1;
1458 }
1459
1460 /* Note rc enters this function as number of bytes transferred */
1461 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1462 {
1463         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1464         const lnet_process_id_t *peer =
1465                         &req->rq_import->imp_connection->c_peer;
1466         struct client_obd *cli = aa->aa_cli;
1467         struct ost_body *body;
1468         __u32 client_cksum = 0;
1469         ENTRY;
1470
1471         if (rc < 0 && rc != -EDQUOT)
1472                 RETURN(rc);
1473
1474         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1475         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1476                                   lustre_swab_ost_body);
1477         if (body == NULL) {
1478                 CDEBUG(D_INFO, "Can't unpack body\n");
1479                 RETURN(-EPROTO);
1480         }
1481
1482         /* set/clear over quota flag for a uid/gid */
1483         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1484             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1485                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1486
1487                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1488                              body->oa.o_flags);
1489         }
1490
1491         if (rc < 0)
1492                 RETURN(rc);
1493
1494         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1495                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1496
1497         osc_update_grant(cli, body);
1498
1499         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1500                 if (rc > 0) {
1501                         CERROR("Unexpected +ve rc %d\n", rc);
1502                         RETURN(-EPROTO);
1503                 }
1504                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1505
1506                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1507                         RETURN(-EAGAIN);
1508
1509                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1510                     check_write_checksum(&body->oa, peer, client_cksum,
1511                                          body->oa.o_cksum, aa->aa_requested_nob,
1512                                          aa->aa_page_count, aa->aa_ppga,
1513                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1514                         RETURN(-EAGAIN);
1515
1516                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1517                                      aa->aa_page_count, aa->aa_ppga);
1518                 GOTO(out, rc);
1519         }
1520
1521         /* The rest of this function executes only for OST_READs */
1522
1523         /* if unwrap_bulk failed, return -EAGAIN to retry */
1524         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1525         if (rc < 0)
1526                 GOTO(out, rc = -EAGAIN);
1527
1528         if (rc > aa->aa_requested_nob) {
1529                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1530                        aa->aa_requested_nob);
1531                 RETURN(-EPROTO);
1532         }
1533
1534         if (rc != req->rq_bulk->bd_nob_transferred) {
1535                 CERROR ("Unexpected rc %d (%d transferred)\n",
1536                         rc, req->rq_bulk->bd_nob_transferred);
1537                 return (-EPROTO);
1538         }
1539
1540         if (rc < aa->aa_requested_nob)
1541                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1542
1543         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1544                 static int cksum_counter;
1545                 __u32      server_cksum = body->oa.o_cksum;
1546                 char      *via;
1547                 char      *router;
1548                 cksum_type_t cksum_type;
1549
1550                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1551                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1552                 else
1553                         cksum_type = OBD_CKSUM_CRC32;
1554                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1555                                                  aa->aa_ppga, OST_READ,
1556                                                  cksum_type);
1557
1558                 if (peer->nid == req->rq_bulk->bd_sender) {
1559                         via = router = "";
1560                 } else {
1561                         via = " via ";
1562                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1563                 }
1564
1565                 if (server_cksum == ~0 && rc > 0) {
1566                         CERROR("Protocol error: server %s set the 'checksum' "
1567                                "bit, but didn't send a checksum.  Not fatal, "
1568                                "but please notify on http://bugzilla.lustre.org/\n",
1569                                libcfs_nid2str(peer->nid));
1570                 } else if (server_cksum != client_cksum) {
1571                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1572                                            "%s%s%s inum "LPU64"/"LPU64" object "
1573                                            LPU64"/"LPU64" extent "
1574                                            "["LPU64"-"LPU64"]\n",
1575                                            req->rq_import->imp_obd->obd_name,
1576                                            libcfs_nid2str(peer->nid),
1577                                            via, router,
1578                                            body->oa.o_valid & OBD_MD_FLFID ?
1579                                                 body->oa.o_fid : (__u64)0,
1580                                            body->oa.o_valid & OBD_MD_FLFID ?
1581                                                 body->oa.o_generation :(__u64)0,
1582                                            body->oa.o_id,
1583                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1584                                                 body->oa.o_gr : (__u64)0,
1585                                            aa->aa_ppga[0]->off,
1586                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1587                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1588                                                                         1);
1589                         CERROR("client %x, server %x, cksum_type %x\n",
1590                                client_cksum, server_cksum, cksum_type);
1591                         cksum_counter = 0;
1592                         aa->aa_oa->o_cksum = client_cksum;
1593                         rc = -EAGAIN;
1594                 } else {
1595                         cksum_counter++;
1596                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1597                         rc = 0;
1598                 }
1599         } else if (unlikely(client_cksum)) {
1600                 static int cksum_missed;
1601
1602                 cksum_missed++;
1603                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1604                         CERROR("Checksum %u requested from %s but not sent\n",
1605                                cksum_missed, libcfs_nid2str(peer->nid));
1606         } else {
1607                 rc = 0;
1608         }
1609 out:
1610         if (rc >= 0)
1611                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1612
1613         RETURN(rc);
1614 }
1615
1616 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1617                             struct lov_stripe_md *lsm,
1618                             obd_count page_count, struct brw_page **pga,
1619                             struct obd_capa *ocapa)
1620 {
1621         struct ptlrpc_request *req;
1622         int                    rc;
1623         cfs_waitq_t            waitq;
1624         int                    resends = 0;
1625         struct l_wait_info     lwi;
1626
1627         ENTRY;
1628
1629         cfs_waitq_init(&waitq);
1630
1631 restart_bulk:
1632         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1633                                   page_count, pga, &req, ocapa, 0);
1634         if (rc != 0)
1635                 return (rc);
1636
1637         rc = ptlrpc_queue_wait(req);
1638
1639         if (rc == -ETIMEDOUT && req->rq_resend) {
1640                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1641                 ptlrpc_req_finished(req);
1642                 goto restart_bulk;
1643         }
1644
1645         rc = osc_brw_fini_request(req, rc);
1646
1647         ptlrpc_req_finished(req);
1648         if (osc_recoverable_error(rc)) {
1649                 resends++;
1650                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1651                         CERROR("too many resend retries, returning error\n");
1652                         RETURN(-EIO);
1653                 }
1654
1655                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1656                 l_wait_event(waitq, 0, &lwi);
1657
1658                 goto restart_bulk;
1659         }
1660
1661         RETURN (rc);
1662 }
1663
1664 int osc_brw_redo_request(struct ptlrpc_request *request,
1665                          struct osc_brw_async_args *aa)
1666 {
1667         struct ptlrpc_request *new_req;
1668         struct ptlrpc_request_set *set = request->rq_set;
1669         struct osc_brw_async_args *new_aa;
1670         struct osc_async_page *oap;
1671         int rc = 0;
1672         ENTRY;
1673
1674         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1675                 CERROR("too many resend retries, returning error\n");
1676                 RETURN(-EIO);
1677         }
1678
1679         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1680
1681         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1682                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1683                                   aa->aa_cli, aa->aa_oa,
1684                                   NULL /* lsm unused by osc currently */,
1685                                   aa->aa_page_count, aa->aa_ppga,
1686                                   &new_req, aa->aa_ocapa, 0);
1687         if (rc)
1688                 RETURN(rc);
1689
1690         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1691
1692         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1693                 if (oap->oap_request != NULL) {
1694                         LASSERTF(request == oap->oap_request,
1695                                  "request %p != oap_request %p\n",
1696                                  request, oap->oap_request);
1697                         if (oap->oap_interrupted) {
1698                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1699                                 ptlrpc_req_finished(new_req);
1700                                 RETURN(-EINTR);
1701                         }
1702                 }
1703         }
1704         /* New request takes over pga and oaps from old request.
1705          * Note that copying a list_head doesn't work, need to move it... */
1706         aa->aa_resends++;
1707         new_req->rq_interpret_reply = request->rq_interpret_reply;
1708         new_req->rq_async_args = request->rq_async_args;
1709         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1710
1711         new_aa = ptlrpc_req_async_args(new_req);
1712
1713         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1714         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1715         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1716
1717         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1718                 if (oap->oap_request) {
1719                         ptlrpc_req_finished(oap->oap_request);
1720                         oap->oap_request = ptlrpc_request_addref(new_req);
1721                 }
1722         }
1723
1724         new_aa->aa_ocapa = aa->aa_ocapa;
1725         aa->aa_ocapa = NULL;
1726
1727         /* use ptlrpc_set_add_req is safe because interpret functions work
1728          * in check_set context. only one way exist with access to request
1729          * from different thread got -EINTR - this way protected with
1730          * cl_loi_list_lock */
1731         ptlrpc_set_add_req(set, new_req);
1732
1733         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1734
1735         DEBUG_REQ(D_INFO, new_req, "new request");
1736         RETURN(0);
1737 }
1738
1739 /*
1740  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1741  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1742  * fine for our small page arrays and doesn't require allocation.  its an
1743  * insertion sort that swaps elements that are strides apart, shrinking the
1744  * stride down until its '1' and the array is sorted.
1745  */
1746 static void sort_brw_pages(struct brw_page **array, int num)
1747 {
1748         int stride, i, j;
1749         struct brw_page *tmp;
1750
1751         if (num == 1)
1752                 return;
1753         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1754                 ;
1755
1756         do {
1757                 stride /= 3;
1758                 for (i = stride ; i < num ; i++) {
1759                         tmp = array[i];
1760                         j = i;
1761                         while (j >= stride && array[j - stride]->off > tmp->off) {
1762                                 array[j] = array[j - stride];
1763                                 j -= stride;
1764                         }
1765                         array[j] = tmp;
1766                 }
1767         } while (stride > 1);
1768 }
1769
1770 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1771 {
1772         int count = 1;
1773         int offset;
1774         int i = 0;
1775
1776         LASSERT (pages > 0);
1777         offset = pg[i]->off & ~CFS_PAGE_MASK;
1778
1779         for (;;) {
1780                 pages--;
1781                 if (pages == 0)         /* that's all */
1782                         return count;
1783
1784                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1785                         return count;   /* doesn't end on page boundary */
1786
1787                 i++;
1788                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1789                 if (offset != 0)        /* doesn't start on page boundary */
1790                         return count;
1791
1792                 count++;
1793         }
1794 }
1795
1796 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1797 {
1798         struct brw_page **ppga;
1799         int i;
1800
1801         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1802         if (ppga == NULL)
1803                 return NULL;
1804
1805         for (i = 0; i < count; i++)
1806                 ppga[i] = pga + i;
1807         return ppga;
1808 }
1809
1810 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1811 {
1812         LASSERT(ppga != NULL);
1813         OBD_FREE(ppga, sizeof(*ppga) * count);
1814 }
1815
1816 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1817                    obd_count page_count, struct brw_page *pga,
1818                    struct obd_trans_info *oti)
1819 {
1820         struct obdo *saved_oa = NULL;
1821         struct brw_page **ppga, **orig;
1822         struct obd_import *imp = class_exp2cliimp(exp);
1823         struct client_obd *cli;
1824         int rc, page_count_orig;
1825         ENTRY;
1826
1827         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1828         cli = &imp->imp_obd->u.cli;
1829
1830         if (cmd & OBD_BRW_CHECK) {
1831                 /* The caller just wants to know if there's a chance that this
1832                  * I/O can succeed */
1833
1834                 if (imp->imp_invalid)
1835                         RETURN(-EIO);
1836                 RETURN(0);
1837         }
1838
1839         /* test_brw with a failed create can trip this, maybe others. */
1840         LASSERT(cli->cl_max_pages_per_rpc);
1841
1842         rc = 0;
1843
1844         orig = ppga = osc_build_ppga(pga, page_count);
1845         if (ppga == NULL)
1846                 RETURN(-ENOMEM);
1847         page_count_orig = page_count;
1848
1849         sort_brw_pages(ppga, page_count);
1850         while (page_count) {
1851                 obd_count pages_per_brw;
1852
1853                 if (page_count > cli->cl_max_pages_per_rpc)
1854                         pages_per_brw = cli->cl_max_pages_per_rpc;
1855                 else
1856                         pages_per_brw = page_count;
1857
1858                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1859
1860                 if (saved_oa != NULL) {
1861                         /* restore previously saved oa */
1862                         *oinfo->oi_oa = *saved_oa;
1863                 } else if (page_count > pages_per_brw) {
1864                         /* save a copy of oa (brw will clobber it) */
1865                         OBDO_ALLOC(saved_oa);
1866                         if (saved_oa == NULL)
1867                                 GOTO(out, rc = -ENOMEM);
1868                         *saved_oa = *oinfo->oi_oa;
1869                 }
1870
1871                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1872                                       pages_per_brw, ppga, oinfo->oi_capa);
1873
1874                 if (rc != 0)
1875                         break;
1876
1877                 page_count -= pages_per_brw;
1878                 ppga += pages_per_brw;
1879         }
1880
1881 out:
1882         osc_release_ppga(orig, page_count_orig);
1883
1884         if (saved_oa != NULL)
1885                 OBDO_FREE(saved_oa);
1886
1887         RETURN(rc);
1888 }
1889
1890 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1891  * the dirty accounting.  Writeback completes or truncate happens before
1892  * writing starts.  Must be called with the loi lock held. */
1893 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1894                            int sent)
1895 {
1896         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1897 }
1898
1899
1900 /* This maintains the lists of pending pages to read/write for a given object
1901  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1902  * to quickly find objects that are ready to send an RPC. */
1903 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1904                          int cmd)
1905 {
1906         int optimal;
1907         ENTRY;
1908
1909         if (lop->lop_num_pending == 0)
1910                 RETURN(0);
1911
1912         /* if we have an invalid import we want to drain the queued pages
1913          * by forcing them through rpcs that immediately fail and complete
1914          * the pages.  recovery relies on this to empty the queued pages
1915          * before canceling the locks and evicting down the llite pages */
1916         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1917                 RETURN(1);
1918
1919         /* stream rpcs in queue order as long as as there is an urgent page
1920          * queued.  this is our cheap solution for good batching in the case
1921          * where writepage marks some random page in the middle of the file
1922          * as urgent because of, say, memory pressure */
1923         if (!list_empty(&lop->lop_urgent)) {
1924                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1925                 RETURN(1);
1926         }
1927         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1928         optimal = cli->cl_max_pages_per_rpc;
1929         if (cmd & OBD_BRW_WRITE) {
1930                 /* trigger a write rpc stream as long as there are dirtiers
1931                  * waiting for space.  as they're waiting, they're not going to
1932                  * create more pages to coallesce with what's waiting.. */
1933                 if (!list_empty(&cli->cl_cache_waiters)) {
1934                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1935                         RETURN(1);
1936                 }
1937                 /* +16 to avoid triggering rpcs that would want to include pages
1938                  * that are being queued but which can't be made ready until
1939                  * the queuer finishes with the page. this is a wart for
1940                  * llite::commit_write() */
1941                 optimal += 16;
1942         }
1943         if (lop->lop_num_pending >= optimal)
1944                 RETURN(1);
1945
1946         RETURN(0);
1947 }
1948
1949 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1950 {
1951         struct osc_async_page *oap;
1952         ENTRY;
1953
1954         if (list_empty(&lop->lop_urgent))
1955                 RETURN(0);
1956
1957         oap = list_entry(lop->lop_urgent.next,
1958                          struct osc_async_page, oap_urgent_item);
1959
1960         if (oap->oap_async_flags & ASYNC_HP) {
1961                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1962                 RETURN(1);
1963         }
1964
1965         RETURN(0);
1966 }
1967
1968 static void on_list(struct list_head *item, struct list_head *list,
1969                     int should_be_on)
1970 {
1971         if (list_empty(item) && should_be_on)
1972                 list_add_tail(item, list);
1973         else if (!list_empty(item) && !should_be_on)
1974                 list_del_init(item);
1975 }
1976
1977 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1978  * can find pages to build into rpcs quickly */
1979 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1980 {
1981         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1982             lop_makes_hprpc(&loi->loi_read_lop)) {
1983                 /* HP rpc */
1984                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1985                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1986         } else {
1987                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1988                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1989                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1990                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1991         }
1992
1993         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1994                 loi->loi_write_lop.lop_num_pending);
1995
1996         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1997                 loi->loi_read_lop.lop_num_pending);
1998 }
1999
2000 static void lop_update_pending(struct client_obd *cli,
2001                                struct loi_oap_pages *lop, int cmd, int delta)
2002 {
2003         lop->lop_num_pending += delta;
2004         if (cmd & OBD_BRW_WRITE)
2005                 cli->cl_pending_w_pages += delta;
2006         else
2007                 cli->cl_pending_r_pages += delta;
2008 }
2009
2010 /**
2011  * this is called when a sync waiter receives an interruption.  Its job is to
2012  * get the caller woken as soon as possible.  If its page hasn't been put in an
2013  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2014  * desiring interruption which will forcefully complete the rpc once the rpc
2015  * has timed out.
2016  */
2017 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2018 {
2019         struct loi_oap_pages *lop;
2020         struct lov_oinfo *loi;
2021         int rc = -EBUSY;
2022         ENTRY;
2023
2024         LASSERT(!oap->oap_interrupted);
2025         oap->oap_interrupted = 1;
2026
2027         /* ok, it's been put in an rpc. only one oap gets a request reference */
2028         if (oap->oap_request != NULL) {
2029                 ptlrpc_mark_interrupted(oap->oap_request);
2030                 ptlrpcd_wake(oap->oap_request);
2031                 ptlrpc_req_finished(oap->oap_request);
2032                 oap->oap_request = NULL;
2033         }
2034
2035         /*
2036          * page completion may be called only if ->cpo_prep() method was
2037          * executed by osc_io_submit(), that also adds page the to pending list
2038          */
2039         if (!list_empty(&oap->oap_pending_item)) {
2040                 list_del_init(&oap->oap_pending_item);
2041                 list_del_init(&oap->oap_urgent_item);
2042
2043                 loi = oap->oap_loi;
2044                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2045                         &loi->loi_write_lop : &loi->loi_read_lop;
2046                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2047                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2048                 rc = oap->oap_caller_ops->ap_completion(env,
2049                                           oap->oap_caller_data,
2050                                           oap->oap_cmd, NULL, -EINTR);
2051         }
2052
2053         RETURN(rc);
2054 }
2055
2056 /* this is trying to propogate async writeback errors back up to the
2057  * application.  As an async write fails we record the error code for later if
2058  * the app does an fsync.  As long as errors persist we force future rpcs to be
2059  * sync so that the app can get a sync error and break the cycle of queueing
2060  * pages for which writeback will fail. */
2061 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2062                            int rc)
2063 {
2064         if (rc) {
2065                 if (!ar->ar_rc)
2066                         ar->ar_rc = rc;
2067
2068                 ar->ar_force_sync = 1;
2069                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2070                 return;
2071
2072         }
2073
2074         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2075                 ar->ar_force_sync = 0;
2076 }
2077
2078 void osc_oap_to_pending(struct osc_async_page *oap)
2079 {
2080         struct loi_oap_pages *lop;
2081
2082         if (oap->oap_cmd & OBD_BRW_WRITE)
2083                 lop = &oap->oap_loi->loi_write_lop;
2084         else
2085                 lop = &oap->oap_loi->loi_read_lop;
2086
2087         if (oap->oap_async_flags & ASYNC_HP)
2088                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2089         else if (oap->oap_async_flags & ASYNC_URGENT)
2090                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2091         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2092         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2093 }
2094
2095 /* this must be called holding the loi list lock to give coverage to exit_cache,
2096  * async_flag maintenance, and oap_request */
2097 static void osc_ap_completion(const struct lu_env *env,
2098                               struct client_obd *cli, struct obdo *oa,
2099                               struct osc_async_page *oap, int sent, int rc)
2100 {
2101         __u64 xid = 0;
2102
2103         ENTRY;
2104         if (oap->oap_request != NULL) {
2105                 xid = ptlrpc_req_xid(oap->oap_request);
2106                 ptlrpc_req_finished(oap->oap_request);
2107                 oap->oap_request = NULL;
2108         }
2109
2110         oap->oap_async_flags = 0;
2111         oap->oap_interrupted = 0;
2112
2113         if (oap->oap_cmd & OBD_BRW_WRITE) {
2114                 osc_process_ar(&cli->cl_ar, xid, rc);
2115                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2116         }
2117
2118         if (rc == 0 && oa != NULL) {
2119                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2120                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2121                 if (oa->o_valid & OBD_MD_FLMTIME)
2122                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2123                 if (oa->o_valid & OBD_MD_FLATIME)
2124                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2125                 if (oa->o_valid & OBD_MD_FLCTIME)
2126                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2127         }
2128
2129         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2130                                                 oap->oap_cmd, oa, rc);
2131
2132         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2133          * I/O on the page could start, but OSC calls it under lock
2134          * and thus we can add oap back to pending safely */
2135         if (rc)
2136                 /* upper layer wants to leave the page on pending queue */
2137                 osc_oap_to_pending(oap);
2138         else
2139                 osc_exit_cache(cli, oap, sent);
2140         EXIT;
2141 }
2142
2143 static int brw_interpret(const struct lu_env *env,
2144                          struct ptlrpc_request *req, void *data, int rc)
2145 {
2146         struct osc_brw_async_args *aa = data;
2147         struct client_obd *cli;
2148         int async;
2149         ENTRY;
2150
2151         rc = osc_brw_fini_request(req, rc);
2152         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2153         if (osc_recoverable_error(rc)) {
2154                 rc = osc_brw_redo_request(req, aa);
2155                 if (rc == 0)
2156                         RETURN(0);
2157         }
2158
2159         if (aa->aa_ocapa) {
2160                 capa_put(aa->aa_ocapa);
2161                 aa->aa_ocapa = NULL;
2162         }
2163
2164         cli = aa->aa_cli;
2165
2166         client_obd_list_lock(&cli->cl_loi_list_lock);
2167
2168         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2169          * is called so we know whether to go to sync BRWs or wait for more
2170          * RPCs to complete */
2171         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2172                 cli->cl_w_in_flight--;
2173         else
2174                 cli->cl_r_in_flight--;
2175
2176         async = list_empty(&aa->aa_oaps);
2177         if (!async) { /* from osc_send_oap_rpc() */
2178                 struct osc_async_page *oap, *tmp;
2179                 /* the caller may re-use the oap after the completion call so
2180                  * we need to clean it up a little */
2181                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2182                         list_del_init(&oap->oap_rpc_item);
2183                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2184                 }
2185                 OBDO_FREE(aa->aa_oa);
2186         } else { /* from async_internal() */
2187                 int i;
2188                 for (i = 0; i < aa->aa_page_count; i++)
2189                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2190                
2191                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2192                         OBDO_FREE(aa->aa_oa);
2193         }
2194         osc_wake_cache_waiters(cli);
2195         osc_check_rpcs(env, cli);
2196         client_obd_list_unlock(&cli->cl_loi_list_lock);
2197         if (!async)
2198                 cl_req_completion(env, aa->aa_clerq, rc);
2199         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2200         RETURN(rc);
2201 }
2202
2203 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2204                                             struct client_obd *cli,
2205                                             struct list_head *rpc_list,
2206                                             int page_count, int cmd)
2207 {
2208         struct ptlrpc_request *req;
2209         struct brw_page **pga = NULL;
2210         struct osc_brw_async_args *aa;
2211         struct obdo *oa = NULL;
2212         const struct obd_async_page_ops *ops = NULL;
2213         void *caller_data = NULL;
2214         struct osc_async_page *oap;
2215         struct osc_async_page *tmp;
2216         struct ost_body *body;
2217         struct cl_req *clerq = NULL;
2218         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2219         struct ldlm_lock *lock = NULL;
2220         struct cl_req_attr crattr;
2221         int i, rc;
2222
2223         ENTRY;
2224         LASSERT(!list_empty(rpc_list));
2225
2226         memset(&crattr, 0, sizeof crattr);
2227         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2228         if (pga == NULL)
2229                 GOTO(out, req = ERR_PTR(-ENOMEM));
2230
2231         OBDO_ALLOC(oa);
2232         if (oa == NULL)
2233                 GOTO(out, req = ERR_PTR(-ENOMEM));
2234
2235         i = 0;
2236         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2237                 struct cl_page *page = osc_oap2cl_page(oap);
2238                 if (ops == NULL) {
2239                         ops = oap->oap_caller_ops;
2240                         caller_data = oap->oap_caller_data;
2241
2242                         clerq = cl_req_alloc(env, page, crt,
2243                                              1 /* only 1-object rpcs for
2244                                                 * now */);
2245                         if (IS_ERR(clerq))
2246                                 GOTO(out, req = (void *)clerq);
2247                         lock = oap->oap_ldlm_lock;
2248                 }
2249                 pga[i] = &oap->oap_brw_page;
2250                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2251                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2252                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2253                 i++;
2254                 cl_req_page_add(env, clerq, page);
2255         }
2256
2257         /* always get the data for the obdo for the rpc */
2258         LASSERT(ops != NULL);
2259         crattr.cra_oa = oa;
2260         crattr.cra_capa = NULL;
2261         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2262         if (lock) {
2263                 oa->o_handle = lock->l_remote_handle;
2264                 oa->o_valid |= OBD_MD_FLHANDLE;
2265         }
2266
2267         rc = cl_req_prep(env, clerq);
2268         if (rc != 0) {
2269                 CERROR("cl_req_prep failed: %d\n", rc);
2270                 GOTO(out, req = ERR_PTR(rc));
2271         }
2272
2273         sort_brw_pages(pga, page_count);
2274         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2275                                   pga, &req, crattr.cra_capa, 1);
2276         if (rc != 0) {
2277                 CERROR("prep_req failed: %d\n", rc);
2278                 GOTO(out, req = ERR_PTR(rc));
2279         }
2280
2281         /* Need to update the timestamps after the request is built in case
2282          * we race with setattr (locally or in queue at OST).  If OST gets
2283          * later setattr before earlier BRW (as determined by the request xid),
2284          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2285          * way to do this in a single call.  bug 10150 */
2286         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2287         cl_req_attr_set(env, clerq, &crattr,
2288                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2289
2290         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2291         aa = ptlrpc_req_async_args(req);
2292         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2293         list_splice(rpc_list, &aa->aa_oaps);
2294         CFS_INIT_LIST_HEAD(rpc_list);
2295         aa->aa_clerq = clerq;
2296 out:
2297         capa_put(crattr.cra_capa);
2298         if (IS_ERR(req)) {
2299                 if (oa)
2300                         OBDO_FREE(oa);
2301                 if (pga)
2302                         OBD_FREE(pga, sizeof(*pga) * page_count);
2303                 /* this should happen rarely and is pretty bad, it makes the
2304                  * pending list not follow the dirty order */
2305                 client_obd_list_lock(&cli->cl_loi_list_lock);
2306                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2307                         list_del_init(&oap->oap_rpc_item);
2308
2309                         /* queued sync pages can be torn down while the pages
2310                          * were between the pending list and the rpc */
2311                         if (oap->oap_interrupted) {
2312                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2313                                 osc_ap_completion(env, cli, NULL, oap, 0,
2314                                                   oap->oap_count);
2315                                 continue;
2316                         }
2317                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2318                 }
2319                 if (clerq && !IS_ERR(clerq))
2320                         cl_req_completion(env, clerq, PTR_ERR(req));
2321         }
2322         RETURN(req);
2323 }
2324
2325 /**
2326  * prepare pages for ASYNC io and put pages in send queue.
2327  *
2328  * \param cli -
2329  * \param loi -
2330  * \param cmd - OBD_BRW_* macroses
2331  * \param lop - pending pages
2332  *
2333  * \return zero if pages successfully add to send queue.
2334  * \return not zere if error occurring.
2335  */
2336 static int
2337 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2338                  struct lov_oinfo *loi,
2339                  int cmd, struct loi_oap_pages *lop)
2340 {
2341         struct ptlrpc_request *req;
2342         obd_count page_count = 0;
2343         struct osc_async_page *oap = NULL, *tmp;
2344         struct osc_brw_async_args *aa;
2345         const struct obd_async_page_ops *ops;
2346         CFS_LIST_HEAD(rpc_list);
2347         unsigned int ending_offset;
2348         unsigned  starting_offset = 0;
2349         int srvlock = 0;
2350         struct cl_object *clob = NULL;
2351         ENTRY;
2352
2353         /* If there are HP OAPs we need to handle at least 1 of them,
2354          * move it the beginning of the pending list for that. */
2355         if (!list_empty(&lop->lop_urgent)) {
2356                 oap = list_entry(lop->lop_urgent.next,
2357                                  struct osc_async_page, oap_urgent_item);
2358                 if (oap->oap_async_flags & ASYNC_HP)
2359                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2360         }
2361
2362         /* first we find the pages we're allowed to work with */
2363         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2364                                  oap_pending_item) {
2365                 ops = oap->oap_caller_ops;
2366
2367                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2368                          "magic 0x%x\n", oap, oap->oap_magic);
2369
2370                 if (clob == NULL) {
2371                         /* pin object in memory, so that completion call-backs
2372                          * can be safely called under client_obd_list lock. */
2373                         clob = osc_oap2cl_page(oap)->cp_obj;
2374                         cl_object_get(clob);
2375                 }
2376
2377                 if (page_count != 0 &&
2378                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2379                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2380                                " oap %p, page %p, srvlock %u\n",
2381                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2382                         break;
2383                 }
2384                 /* in llite being 'ready' equates to the page being locked
2385                  * until completion unlocks it.  commit_write submits a page
2386                  * as not ready because its unlock will happen unconditionally
2387                  * as the call returns.  if we race with commit_write giving
2388                  * us that page we dont' want to create a hole in the page
2389                  * stream, so we stop and leave the rpc to be fired by
2390                  * another dirtier or kupdated interval (the not ready page
2391                  * will still be on the dirty list).  we could call in
2392                  * at the end of ll_file_write to process the queue again. */
2393                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2394                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2395                                                     cmd);
2396                         if (rc < 0)
2397                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2398                                                 "instead of ready\n", oap,
2399                                                 oap->oap_page, rc);
2400                         switch (rc) {
2401                         case -EAGAIN:
2402                                 /* llite is telling us that the page is still
2403                                  * in commit_write and that we should try
2404                                  * and put it in an rpc again later.  we
2405                                  * break out of the loop so we don't create
2406                                  * a hole in the sequence of pages in the rpc
2407                                  * stream.*/
2408                                 oap = NULL;
2409                                 break;
2410                         case -EINTR:
2411                                 /* the io isn't needed.. tell the checks
2412                                  * below to complete the rpc with EINTR */
2413                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2414                                 oap->oap_count = -EINTR;
2415                                 break;
2416                         case 0:
2417                                 oap->oap_async_flags |= ASYNC_READY;
2418                                 break;
2419                         default:
2420                                 LASSERTF(0, "oap %p page %p returned %d "
2421                                             "from make_ready\n", oap,
2422                                             oap->oap_page, rc);
2423                                 break;
2424                         }
2425                 }
2426                 if (oap == NULL)
2427                         break;
2428                 /*
2429                  * Page submitted for IO has to be locked. Either by
2430                  * ->ap_make_ready() or by higher layers.
2431                  */
2432 #if defined(__KERNEL__) && defined(__linux__)
2433                 {
2434                         struct cl_page *page;
2435
2436                         page = osc_oap2cl_page(oap);
2437
2438                         if (page->cp_type == CPT_CACHEABLE &&
2439                             !(PageLocked(oap->oap_page) &&
2440                               (CheckWriteback(oap->oap_page, cmd)))) {
2441                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2442                                        oap->oap_page,
2443                                        (long)oap->oap_page->flags,
2444                                        oap->oap_async_flags);
2445                                 LBUG();
2446                         }
2447                 }
2448 #endif
2449                 /* If there is a gap at the start of this page, it can't merge
2450                  * with any previous page, so we'll hand the network a
2451                  * "fragmented" page array that it can't transfer in 1 RDMA */
2452                 if (page_count != 0 && oap->oap_page_off != 0)
2453                         break;
2454
2455                 /* take the page out of our book-keeping */
2456                 list_del_init(&oap->oap_pending_item);
2457                 lop_update_pending(cli, lop, cmd, -1);
2458                 list_del_init(&oap->oap_urgent_item);
2459
2460                 if (page_count == 0)
2461                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2462                                           (PTLRPC_MAX_BRW_SIZE - 1);
2463
2464                 /* ask the caller for the size of the io as the rpc leaves. */
2465                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2466                         oap->oap_count =
2467                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2468                                                       cmd);
2469                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2470                 }
2471                 if (oap->oap_count <= 0) {
2472                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2473                                oap->oap_count);
2474                         osc_ap_completion(env, cli, NULL,
2475                                           oap, 0, oap->oap_count);
2476                         continue;
2477                 }
2478
2479                 /* now put the page back in our accounting */
2480                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2481                 if (page_count == 0)
2482                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2483                 if (++page_count >= cli->cl_max_pages_per_rpc)
2484                         break;
2485
2486                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2487                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2488                  * have the same alignment as the initial writes that allocated
2489                  * extents on the server. */
2490                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2491                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2492                 if (ending_offset == 0)
2493                         break;
2494
2495                 /* If there is a gap at the end of this page, it can't merge
2496                  * with any subsequent pages, so we'll hand the network a
2497                  * "fragmented" page array that it can't transfer in 1 RDMA */
2498                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2499                         break;
2500         }
2501
2502         osc_wake_cache_waiters(cli);
2503
2504         loi_list_maint(cli, loi);
2505
2506         client_obd_list_unlock(&cli->cl_loi_list_lock);
2507
2508         if (clob != NULL)
2509                 cl_object_put(env, clob);
2510
2511         if (page_count == 0) {
2512                 client_obd_list_lock(&cli->cl_loi_list_lock);
2513                 RETURN(0);
2514         }
2515
2516         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2517         if (IS_ERR(req)) {
2518                 LASSERT(list_empty(&rpc_list));
2519                 loi_list_maint(cli, loi);
2520                 RETURN(PTR_ERR(req));
2521         }
2522
2523         aa = ptlrpc_req_async_args(req);
2524
2525         if (cmd == OBD_BRW_READ) {
2526                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2527                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2528                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2529                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2530         } else {
2531                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2532                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2533                                  cli->cl_w_in_flight);
2534                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2535                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2536         }
2537         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2538
2539         client_obd_list_lock(&cli->cl_loi_list_lock);
2540
2541         if (cmd == OBD_BRW_READ)
2542                 cli->cl_r_in_flight++;
2543         else
2544                 cli->cl_w_in_flight++;
2545
2546         /* queued sync pages can be torn down while the pages
2547          * were between the pending list and the rpc */
2548         tmp = NULL;
2549         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2550                 /* only one oap gets a request reference */
2551                 if (tmp == NULL)
2552                         tmp = oap;
2553                 if (oap->oap_interrupted && !req->rq_intr) {
2554                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2555                                oap, req);
2556                         ptlrpc_mark_interrupted(req);
2557                 }
2558         }
2559         if (tmp != NULL)
2560                 tmp->oap_request = ptlrpc_request_addref(req);
2561
2562         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2563                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2564
2565         req->rq_interpret_reply = brw_interpret;
2566         ptlrpcd_add_req(req, PSCOPE_BRW);
2567         RETURN(1);
2568 }
2569
2570 #define LOI_DEBUG(LOI, STR, args...)                                     \
2571         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2572                !list_empty(&(LOI)->loi_ready_item) ||                    \
2573                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2574                (LOI)->loi_write_lop.lop_num_pending,                     \
2575                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2576                (LOI)->loi_read_lop.lop_num_pending,                      \
2577                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2578                args)                                                     \
2579
2580 /* This is called by osc_check_rpcs() to find which objects have pages that
2581  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2582 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2583 {
2584         ENTRY;
2585
2586         /* First return objects that have blocked locks so that they
2587          * will be flushed quickly and other clients can get the lock,
2588          * then objects which have pages ready to be stuffed into RPCs */
2589         if (!list_empty(&cli->cl_loi_hp_ready_list))
2590                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2591                                   struct lov_oinfo, loi_hp_ready_item));
2592         if (!list_empty(&cli->cl_loi_ready_list))
2593                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2594                                   struct lov_oinfo, loi_ready_item));
2595
2596         /* then if we have cache waiters, return all objects with queued
2597          * writes.  This is especially important when many small files
2598          * have filled up the cache and not been fired into rpcs because
2599          * they don't pass the nr_pending/object threshhold */
2600         if (!list_empty(&cli->cl_cache_waiters) &&
2601             !list_empty(&cli->cl_loi_write_list))
2602                 RETURN(list_entry(cli->cl_loi_write_list.next,
2603                                   struct lov_oinfo, loi_write_item));
2604
2605         /* then return all queued objects when we have an invalid import
2606          * so that they get flushed */
2607         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2608                 if (!list_empty(&cli->cl_loi_write_list))
2609                         RETURN(list_entry(cli->cl_loi_write_list.next,
2610                                           struct lov_oinfo, loi_write_item));
2611                 if (!list_empty(&cli->cl_loi_read_list))
2612                         RETURN(list_entry(cli->cl_loi_read_list.next,
2613                                           struct lov_oinfo, loi_read_item));
2614         }
2615         RETURN(NULL);
2616 }
2617
2618 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2619 {
2620         struct osc_async_page *oap;
2621         int hprpc = 0;
2622
2623         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2624                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2625                                  struct osc_async_page, oap_urgent_item);
2626                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2627         }
2628
2629         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2630                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2631                                  struct osc_async_page, oap_urgent_item);
2632                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2633         }
2634
2635         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2636 }
2637
2638 /* called with the loi list lock held */
2639 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2640 {
2641         struct lov_oinfo *loi;
2642         int rc = 0, race_counter = 0;
2643         ENTRY;
2644
2645         while ((loi = osc_next_loi(cli)) != NULL) {
2646                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2647
2648                 if (osc_max_rpc_in_flight(cli, loi))
2649                         break;
2650
2651                 /* attempt some read/write balancing by alternating between
2652                  * reads and writes in an object.  The makes_rpc checks here
2653                  * would be redundant if we were getting read/write work items
2654                  * instead of objects.  we don't want send_oap_rpc to drain a
2655                  * partial read pending queue when we're given this object to
2656                  * do io on writes while there are cache waiters */
2657                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2658                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2659                                               &loi->loi_write_lop);
2660                         if (rc < 0)
2661                                 break;
2662                         if (rc > 0)
2663                                 race_counter = 0;
2664                         else
2665                                 race_counter++;
2666                 }
2667                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2668                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2669                                               &loi->loi_read_lop);
2670                         if (rc < 0)
2671                                 break;
2672                         if (rc > 0)
2673                                 race_counter = 0;
2674                         else
2675                                 race_counter++;
2676                 }
2677
2678                 /* attempt some inter-object balancing by issueing rpcs
2679                  * for each object in turn */
2680                 if (!list_empty(&loi->loi_hp_ready_item))
2681                         list_del_init(&loi->loi_hp_ready_item);
2682                 if (!list_empty(&loi->loi_ready_item))
2683                         list_del_init(&loi->loi_ready_item);
2684                 if (!list_empty(&loi->loi_write_item))
2685                         list_del_init(&loi->loi_write_item);
2686                 if (!list_empty(&loi->loi_read_item))
2687                         list_del_init(&loi->loi_read_item);
2688
2689                 loi_list_maint(cli, loi);
2690
2691                 /* send_oap_rpc fails with 0 when make_ready tells it to
2692                  * back off.  llite's make_ready does this when it tries
2693                  * to lock a page queued for write that is already locked.
2694                  * we want to try sending rpcs from many objects, but we
2695                  * don't want to spin failing with 0.  */
2696                 if (race_counter == 10)
2697                         break;
2698         }
2699         EXIT;
2700 }
2701
2702 /* we're trying to queue a page in the osc so we're subject to the
2703  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2704  * If the osc's queued pages are already at that limit, then we want to sleep
2705  * until there is space in the osc's queue for us.  We also may be waiting for
2706  * write credits from the OST if there are RPCs in flight that may return some
2707  * before we fall back to sync writes.
2708  *
2709  * We need this know our allocation was granted in the presence of signals */
2710 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2711 {
2712         int rc;
2713         ENTRY;
2714         client_obd_list_lock(&cli->cl_loi_list_lock);
2715         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2716         client_obd_list_unlock(&cli->cl_loi_list_lock);
2717         RETURN(rc);
2718 };
2719
2720 /**
2721  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2722  * is available.
2723  */
2724 int osc_enter_cache_try(const struct lu_env *env,
2725                         struct client_obd *cli, struct lov_oinfo *loi,
2726                         struct osc_async_page *oap, int transient)
2727 {
2728         int has_grant;
2729
2730         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2731         if (has_grant) {
2732                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2733                 if (transient) {
2734                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2735                         atomic_inc(&obd_dirty_transit_pages);
2736                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2737                 }
2738         }
2739         return has_grant;
2740 }
2741
2742 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2743  * grant or cache space. */
2744 static int osc_enter_cache(const struct lu_env *env,
2745                            struct client_obd *cli, struct lov_oinfo *loi,
2746                            struct osc_async_page *oap)
2747 {
2748         struct osc_cache_waiter ocw;
2749         struct l_wait_info lwi = { 0 };
2750
2751         ENTRY;
2752
2753         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2754                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2755                cli->cl_dirty_max, obd_max_dirty_pages,
2756                cli->cl_lost_grant, cli->cl_avail_grant);
2757
2758         /* force the caller to try sync io.  this can jump the list
2759          * of queued writes and create a discontiguous rpc stream */
2760         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2761             loi->loi_ar.ar_force_sync)
2762                 RETURN(-EDQUOT);
2763
2764         /* Hopefully normal case - cache space and write credits available */
2765         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2766             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2767             osc_enter_cache_try(env, cli, loi, oap, 0))
2768                 RETURN(0);
2769
2770         /* Make sure that there are write rpcs in flight to wait for.  This
2771          * is a little silly as this object may not have any pending but
2772          * other objects sure might. */
2773         if (cli->cl_w_in_flight) {
2774                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2775                 cfs_waitq_init(&ocw.ocw_waitq);
2776                 ocw.ocw_oap = oap;
2777                 ocw.ocw_rc = 0;
2778
2779                 loi_list_maint(cli, loi);
2780                 osc_check_rpcs(env, cli);
2781                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2782
2783                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2784                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2785
2786                 client_obd_list_lock(&cli->cl_loi_list_lock);
2787                 if (!list_empty(&ocw.ocw_entry)) {
2788                         list_del(&ocw.ocw_entry);
2789                         RETURN(-EINTR);
2790                 }
2791                 RETURN(ocw.ocw_rc);
2792         }
2793
2794         RETURN(-EDQUOT);
2795 }
2796
2797
2798 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2799                         struct lov_oinfo *loi, cfs_page_t *page,
2800                         obd_off offset, const struct obd_async_page_ops *ops,
2801                         void *data, void **res, int nocache,
2802                         struct lustre_handle *lockh)
2803 {
2804         struct osc_async_page *oap;
2805
2806         ENTRY;
2807
2808         if (!page)
2809                 return size_round(sizeof(*oap));
2810
2811         oap = *res;
2812         oap->oap_magic = OAP_MAGIC;
2813         oap->oap_cli = &exp->exp_obd->u.cli;
2814         oap->oap_loi = loi;
2815
2816         oap->oap_caller_ops = ops;
2817         oap->oap_caller_data = data;
2818
2819         oap->oap_page = page;
2820         oap->oap_obj_off = offset;
2821         if (!client_is_remote(exp) &&
2822             cfs_capable(CFS_CAP_SYS_RESOURCE))
2823                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2824
2825         LASSERT(!(offset & ~CFS_PAGE_MASK));
2826
2827         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2828         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2829         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2830         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2831
2832         spin_lock_init(&oap->oap_lock);
2833         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2834         RETURN(0);
2835 }
2836
2837 struct osc_async_page *oap_from_cookie(void *cookie)
2838 {
2839         struct osc_async_page *oap = cookie;
2840         if (oap->oap_magic != OAP_MAGIC)
2841                 return ERR_PTR(-EINVAL);
2842         return oap;
2843 };
2844
2845 int osc_queue_async_io(const struct lu_env *env,
2846                        struct obd_export *exp, struct lov_stripe_md *lsm,
2847                        struct lov_oinfo *loi, void *cookie,
2848                        int cmd, obd_off off, int count,
2849                        obd_flag brw_flags, enum async_flags async_flags)
2850 {
2851         struct client_obd *cli = &exp->exp_obd->u.cli;
2852         struct osc_async_page *oap;
2853         int rc = 0;
2854         ENTRY;
2855
2856         oap = oap_from_cookie(cookie);
2857         if (IS_ERR(oap))
2858                 RETURN(PTR_ERR(oap));
2859
2860         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2861                 RETURN(-EIO);
2862
2863         if (!list_empty(&oap->oap_pending_item) ||
2864             !list_empty(&oap->oap_urgent_item) ||
2865             !list_empty(&oap->oap_rpc_item))
2866                 RETURN(-EBUSY);
2867
2868         /* check if the file's owner/group is over quota */
2869         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2870                 struct cl_object *obj;
2871                 struct cl_attr    attr; /* XXX put attr into thread info */
2872                 unsigned int qid[MAXQUOTAS];
2873
2874                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2875
2876                 cl_object_attr_lock(obj);
2877                 rc = cl_object_attr_get(env, obj, &attr);
2878                 cl_object_attr_unlock(obj);
2879
2880                 qid[USRQUOTA] = attr.cat_uid;
2881                 qid[GRPQUOTA] = attr.cat_gid;
2882                 if (rc == 0 &&
2883                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2884                         rc = -EDQUOT;
2885                 if (rc)
2886                         RETURN(rc);
2887         }
2888
2889         if (loi == NULL)
2890                 loi = lsm->lsm_oinfo[0];
2891
2892         client_obd_list_lock(&cli->cl_loi_list_lock);
2893
2894         LASSERT(off + count <= CFS_PAGE_SIZE);
2895         oap->oap_cmd = cmd;
2896         oap->oap_page_off = off;
2897         oap->oap_count = count;
2898         oap->oap_brw_flags = brw_flags;
2899         oap->oap_async_flags = async_flags;
2900
2901         if (cmd & OBD_BRW_WRITE) {
2902                 rc = osc_enter_cache(env, cli, loi, oap);
2903                 if (rc) {
2904                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2905                         RETURN(rc);
2906                 }
2907         }
2908
2909         osc_oap_to_pending(oap);
2910         loi_list_maint(cli, loi);
2911
2912         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2913                   cmd);
2914
2915         osc_check_rpcs(env, cli);
2916         client_obd_list_unlock(&cli->cl_loi_list_lock);
2917
2918         RETURN(0);
2919 }
2920
2921 /* aka (~was & now & flag), but this is more clear :) */
2922 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2923
2924 int osc_set_async_flags_base(struct client_obd *cli,
2925                              struct lov_oinfo *loi, struct osc_async_page *oap,
2926                              obd_flag async_flags)
2927 {
2928         struct loi_oap_pages *lop;
2929         ENTRY;
2930
2931         LASSERT(!list_empty(&oap->oap_pending_item));
2932
2933         if (oap->oap_cmd & OBD_BRW_WRITE) {
2934                 lop = &loi->loi_write_lop;
2935         } else {
2936                 lop = &loi->loi_read_lop;
2937         }
2938
2939         if ((oap->oap_async_flags & async_flags) == async_flags)
2940                 RETURN(0);
2941
2942         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2943                 oap->oap_async_flags |= ASYNC_READY;
2944
2945         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2946             list_empty(&oap->oap_rpc_item)) {
2947                 if (oap->oap_async_flags & ASYNC_HP)
2948                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2949                 else
2950                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2951                 oap->oap_async_flags |= ASYNC_URGENT;
2952                 loi_list_maint(cli, loi);
2953         }
2954
2955         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2956                         oap->oap_async_flags);
2957         RETURN(0);
2958 }
2959
2960 int osc_teardown_async_page(struct obd_export *exp,
2961                             struct lov_stripe_md *lsm,
2962                             struct lov_oinfo *loi, void *cookie)
2963 {
2964         struct client_obd *cli = &exp->exp_obd->u.cli;
2965         struct loi_oap_pages *lop;
2966         struct osc_async_page *oap;
2967         int rc = 0;
2968         ENTRY;
2969
2970         oap = oap_from_cookie(cookie);
2971         if (IS_ERR(oap))
2972                 RETURN(PTR_ERR(oap));
2973
2974         if (loi == NULL)
2975                 loi = lsm->lsm_oinfo[0];
2976
2977         if (oap->oap_cmd & OBD_BRW_WRITE) {
2978                 lop = &loi->loi_write_lop;
2979         } else {
2980                 lop = &loi->loi_read_lop;
2981         }
2982
2983         client_obd_list_lock(&cli->cl_loi_list_lock);
2984
2985         if (!list_empty(&oap->oap_rpc_item))
2986                 GOTO(out, rc = -EBUSY);
2987
2988         osc_exit_cache(cli, oap, 0);
2989         osc_wake_cache_waiters(cli);
2990
2991         if (!list_empty(&oap->oap_urgent_item)) {
2992                 list_del_init(&oap->oap_urgent_item);
2993                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
2994         }
2995         if (!list_empty(&oap->oap_pending_item)) {
2996                 list_del_init(&oap->oap_pending_item);
2997                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2998         }
2999         loi_list_maint(cli, loi);
3000         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3001 out:
3002         client_obd_list_unlock(&cli->cl_loi_list_lock);
3003         RETURN(rc);
3004 }
3005
3006 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3007                                          struct ldlm_enqueue_info *einfo,
3008                                          int flags)
3009 {
3010         void *data = einfo->ei_cbdata;
3011
3012         LASSERT(lock != NULL);
3013         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3014         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3015         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3016         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3017
3018         lock_res_and_lock(lock);
3019         spin_lock(&osc_ast_guard);
3020         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3021         lock->l_ast_data = data;
3022         spin_unlock(&osc_ast_guard);
3023         unlock_res_and_lock(lock);
3024 }
3025
3026 static void osc_set_data_with_check(struct lustre_handle *lockh,
3027                                     struct ldlm_enqueue_info *einfo,
3028                                     int flags)
3029 {
3030         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3031
3032         if (lock != NULL) {
3033                 osc_set_lock_data_with_check(lock, einfo, flags);
3034                 LDLM_LOCK_PUT(lock);
3035         } else
3036                 CERROR("lockh %p, data %p - client evicted?\n",
3037                        lockh, einfo->ei_cbdata);
3038 }
3039
3040 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3041                              ldlm_iterator_t replace, void *data)
3042 {
3043         struct ldlm_res_id res_id;
3044         struct obd_device *obd = class_exp2obd(exp);
3045
3046         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3047         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3048         return 0;
3049 }
3050
3051 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3052                             obd_enqueue_update_f upcall, void *cookie,
3053                             int *flags, int rc)
3054 {
3055         int intent = *flags & LDLM_FL_HAS_INTENT;
3056         ENTRY;
3057
3058         if (intent) {
3059                 /* The request was created before ldlm_cli_enqueue call. */
3060                 if (rc == ELDLM_LOCK_ABORTED) {
3061                         struct ldlm_reply *rep;
3062                         rep = req_capsule_server_get(&req->rq_pill,
3063                                                      &RMF_DLM_REP);
3064
3065                         LASSERT(rep != NULL);
3066                         if (rep->lock_policy_res1)
3067                                 rc = rep->lock_policy_res1;
3068                 }
3069         }
3070
3071         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3072                 *flags |= LDLM_FL_LVB_READY;
3073                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3074                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3075         }
3076
3077         /* Call the update callback. */
3078         rc = (*upcall)(cookie, rc);
3079         RETURN(rc);
3080 }
3081
3082 static int osc_enqueue_interpret(const struct lu_env *env,
3083                                  struct ptlrpc_request *req,
3084                                  struct osc_enqueue_args *aa, int rc)
3085 {
3086         struct ldlm_lock *lock;
3087         struct lustre_handle handle;
3088         __u32 mode;
3089
3090         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3091          * might be freed anytime after lock upcall has been called. */
3092         lustre_handle_copy(&handle, aa->oa_lockh);
3093         mode = aa->oa_ei->ei_mode;
3094
3095         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3096          * be valid. */
3097         lock = ldlm_handle2lock(&handle);
3098
3099         /* Take an additional reference so that a blocking AST that
3100          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3101          * to arrive after an upcall has been executed by
3102          * osc_enqueue_fini(). */
3103         ldlm_lock_addref(&handle, mode);
3104
3105         /* Complete obtaining the lock procedure. */
3106         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3107                                    mode, aa->oa_flags, aa->oa_lvb,
3108                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3109                                    &handle, rc);
3110         /* Complete osc stuff. */
3111         rc = osc_enqueue_fini(req, aa->oa_lvb,
3112                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3113
3114         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3115
3116         /* Release the lock for async request. */
3117         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3118                 /*
3119                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3120                  * not already released by
3121                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3122                  */
3123                 ldlm_lock_decref(&handle, mode);
3124
3125         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3126                  aa->oa_lockh, req, aa);
3127         ldlm_lock_decref(&handle, mode);
3128         LDLM_LOCK_PUT(lock);
3129         return rc;
3130 }
3131
3132 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3133                         struct lov_oinfo *loi, int flags,
3134                         struct ost_lvb *lvb, __u32 mode, int rc)
3135 {
3136         if (rc == ELDLM_OK) {
3137                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3138                 __u64 tmp;
3139
3140                 LASSERT(lock != NULL);
3141                 loi->loi_lvb = *lvb;
3142                 tmp = loi->loi_lvb.lvb_size;
3143                 /* Extend KMS up to the end of this lock and no further
3144                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3145                 if (tmp > lock->l_policy_data.l_extent.end)
3146                         tmp = lock->l_policy_data.l_extent.end + 1;
3147                 if (tmp >= loi->loi_kms) {
3148                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3149                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3150                         loi_kms_set(loi, tmp);
3151                 } else {
3152                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3153                                    LPU64"; leaving kms="LPU64", end="LPU64,
3154                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3155                                    lock->l_policy_data.l_extent.end);
3156                 }
3157                 ldlm_lock_allow_match(lock);
3158                 LDLM_LOCK_PUT(lock);
3159         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3160                 loi->loi_lvb = *lvb;
3161                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3162                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3163                 rc = ELDLM_OK;
3164         }
3165 }
3166 EXPORT_SYMBOL(osc_update_enqueue);
3167
3168 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3169
3170 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3171  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3172  * other synchronous requests, however keeping some locks and trying to obtain
3173  * others may take a considerable amount of time in a case of ost failure; and
3174  * when other sync requests do not get released lock from a client, the client
3175  * is excluded from the cluster -- such scenarious make the life difficult, so
3176  * release locks just after they are obtained. */
3177 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3178                      int *flags, ldlm_policy_data_t *policy,
3179                      struct ost_lvb *lvb, int kms_valid,
3180                      obd_enqueue_update_f upcall, void *cookie,
3181                      struct ldlm_enqueue_info *einfo,
3182                      struct lustre_handle *lockh,
3183                      struct ptlrpc_request_set *rqset, int async)
3184 {
3185         struct obd_device *obd = exp->exp_obd;
3186         struct ptlrpc_request *req = NULL;
3187         int intent = *flags & LDLM_FL_HAS_INTENT;
3188         ldlm_mode_t mode;
3189         int rc;
3190         ENTRY;
3191
3192         /* Filesystem lock extents are extended to page boundaries so that
3193          * dealing with the page cache is a little smoother.  */
3194         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3195         policy->l_extent.end |= ~CFS_PAGE_MASK;
3196
3197         /*
3198          * kms is not valid when either object is completely fresh (so that no
3199          * locks are cached), or object was evicted. In the latter case cached
3200          * lock cannot be used, because it would prime inode state with
3201          * potentially stale LVB.
3202          */
3203         if (!kms_valid)
3204                 goto no_match;
3205
3206         /* Next, search for already existing extent locks that will cover us */
3207         /* If we're trying to read, we also search for an existing PW lock.  The
3208          * VFS and page cache already protect us locally, so lots of readers/
3209          * writers can share a single PW lock.
3210          *
3211          * There are problems with conversion deadlocks, so instead of
3212          * converting a read lock to a write lock, we'll just enqueue a new
3213          * one.
3214          *
3215          * At some point we should cancel the read lock instead of making them
3216          * send us a blocking callback, but there are problems with canceling
3217          * locks out from other users right now, too. */
3218         mode = einfo->ei_mode;
3219         if (einfo->ei_mode == LCK_PR)
3220                 mode |= LCK_PW;
3221         mode = ldlm_lock_match(obd->obd_namespace,
3222                                *flags | LDLM_FL_LVB_READY, res_id,
3223                                einfo->ei_type, policy, mode, lockh, 0);
3224         if (mode) {
3225                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3226
3227                 if (matched->l_ast_data == NULL ||
3228                     matched->l_ast_data == einfo->ei_cbdata) {
3229                         /* addref the lock only if not async requests and PW
3230                          * lock is matched whereas we asked for PR. */
3231                         if (!rqset && einfo->ei_mode != mode)
3232                                 ldlm_lock_addref(lockh, LCK_PR);
3233                         osc_set_lock_data_with_check(matched, einfo, *flags);
3234                         if (intent) {
3235                                 /* I would like to be able to ASSERT here that
3236                                  * rss <= kms, but I can't, for reasons which
3237                                  * are explained in lov_enqueue() */
3238                         }
3239
3240                         /* We already have a lock, and it's referenced */
3241                         (*upcall)(cookie, ELDLM_OK);
3242
3243                         /* For async requests, decref the lock. */
3244                         if (einfo->ei_mode != mode)
3245                                 ldlm_lock_decref(lockh, LCK_PW);
3246                         else if (rqset)
3247                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3248                         LDLM_LOCK_PUT(matched);
3249                         RETURN(ELDLM_OK);
3250                 } else
3251                         ldlm_lock_decref(lockh, mode);
3252                 LDLM_LOCK_PUT(matched);
3253         }
3254
3255  no_match:
3256         if (intent) {
3257                 CFS_LIST_HEAD(cancels);
3258                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3259                                            &RQF_LDLM_ENQUEUE_LVB);
3260                 if (req == NULL)
3261                         RETURN(-ENOMEM);
3262
3263                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3264                 if (rc)
3265                         RETURN(rc);
3266
3267                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3268                                      sizeof *lvb);
3269                 ptlrpc_request_set_replen(req);
3270         }
3271
3272         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3273         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3274
3275         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3276                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3277         if (rqset) {
3278                 if (!rc) {
3279                         struct osc_enqueue_args *aa;
3280                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3281                         aa = ptlrpc_req_async_args(req);
3282                         aa->oa_ei = einfo;
3283                         aa->oa_exp = exp;
3284                         aa->oa_flags  = flags;
3285                         aa->oa_upcall = upcall;
3286                         aa->oa_cookie = cookie;
3287                         aa->oa_lvb    = lvb;
3288                         aa->oa_lockh  = lockh;
3289
3290                         req->rq_interpret_reply =
3291                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3292                         if (rqset == PTLRPCD_SET)
3293                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3294                         else
3295                                 ptlrpc_set_add_req(rqset, req);
3296                 } else if (intent) {
3297                         ptlrpc_req_finished(req);
3298                 }
3299                 RETURN(rc);
3300         }
3301
3302         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3303         if (intent)
3304                 ptlrpc_req_finished(req);
3305
3306         RETURN(rc);
3307 }
3308
3309 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3310                        struct ldlm_enqueue_info *einfo,
3311                        struct ptlrpc_request_set *rqset)
3312 {
3313         struct ldlm_res_id res_id;
3314         int rc;
3315         ENTRY;
3316
3317         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3318                            oinfo->oi_md->lsm_object_gr, &res_id);
3319
3320         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3321                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3322                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3323                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3324                               rqset, rqset != NULL);
3325         RETURN(rc);
3326 }
3327
3328 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3329                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3330                    int *flags, void *data, struct lustre_handle *lockh,
3331                    int unref)
3332 {
3333         struct obd_device *obd = exp->exp_obd;
3334         int lflags = *flags;
3335         ldlm_mode_t rc;
3336         ENTRY;
3337
3338         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3339                 RETURN(-EIO);
3340
3341         /* Filesystem lock extents are extended to page boundaries so that
3342          * dealing with the page cache is a little smoother */
3343         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3344         policy->l_extent.end |= ~CFS_PAGE_MASK;
3345
3346         /* Next, search for already existing extent locks that will cover us */
3347         /* If we're trying to read, we also search for an existing PW lock.  The
3348          * VFS and page cache already protect us locally, so lots of readers/
3349          * writers can share a single PW lock. */
3350         rc = mode;
3351         if (mode == LCK_PR)
3352                 rc |= LCK_PW;
3353         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3354                              res_id, type, policy, rc, lockh, unref);
3355         if (rc) {
3356                 if (data != NULL)
3357                         osc_set_data_with_check(lockh, data, lflags);
3358                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3359                         ldlm_lock_addref(lockh, LCK_PR);
3360                         ldlm_lock_decref(lockh, LCK_PW);
3361                 }
3362                 RETURN(rc);
3363         }
3364         RETURN(rc);
3365 }
3366
3367 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3368 {
3369         ENTRY;
3370
3371         if (unlikely(mode == LCK_GROUP))
3372                 ldlm_lock_decref_and_cancel(lockh, mode);
3373         else
3374                 ldlm_lock_decref(lockh, mode);
3375
3376         RETURN(0);
3377 }
3378
3379 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3380                       __u32 mode, struct lustre_handle *lockh)
3381 {
3382         ENTRY;
3383         RETURN(osc_cancel_base(lockh, mode));
3384 }
3385
3386 static int osc_cancel_unused(struct obd_export *exp,
3387                              struct lov_stripe_md *lsm, int flags,
3388                              void *opaque)
3389 {
3390         struct obd_device *obd = class_exp2obd(exp);
3391         struct ldlm_res_id res_id, *resp = NULL;
3392
3393         if (lsm != NULL) {
3394                 resp = osc_build_res_name(lsm->lsm_object_id,
3395                                           lsm->lsm_object_gr, &res_id);
3396         }
3397
3398         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3399 }
3400
3401 static int osc_statfs_interpret(const struct lu_env *env,
3402                                 struct ptlrpc_request *req,
3403                                 struct osc_async_args *aa, int rc)
3404 {
3405         struct obd_statfs *msfs;
3406         ENTRY;
3407
3408         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3409             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3410                 GOTO(out, rc = 0);
3411
3412         if (rc != 0)
3413                 GOTO(out, rc);
3414
3415         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3416         if (msfs == NULL) {
3417                 GOTO(out, rc = -EPROTO);
3418         }
3419
3420         *aa->aa_oi->oi_osfs = *msfs;
3421 out:
3422         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3423         RETURN(rc);
3424 }
3425
3426 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3427                             __u64 max_age, struct ptlrpc_request_set *rqset)
3428 {
3429         struct ptlrpc_request *req;
3430         struct osc_async_args *aa;
3431         int                    rc;
3432         ENTRY;
3433
3434         /* We could possibly pass max_age in the request (as an absolute
3435          * timestamp or a "seconds.usec ago") so the target can avoid doing
3436          * extra calls into the filesystem if that isn't necessary (e.g.
3437          * during mount that would help a bit).  Having relative timestamps
3438          * is not so great if request processing is slow, while absolute
3439          * timestamps are not ideal because they need time synchronization. */
3440         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3441         if (req == NULL)
3442                 RETURN(-ENOMEM);
3443
3444         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3445         if (rc) {
3446                 ptlrpc_request_free(req);
3447                 RETURN(rc);
3448         }
3449         ptlrpc_request_set_replen(req);
3450         req->rq_request_portal = OST_CREATE_PORTAL;
3451         ptlrpc_at_set_req_timeout(req);
3452
3453         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3454                 /* procfs requests not want stat in wait for avoid deadlock */
3455                 req->rq_no_resend = 1;
3456                 req->rq_no_delay = 1;
3457         }
3458
3459         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3460         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3461         aa = ptlrpc_req_async_args(req);
3462         aa->aa_oi = oinfo;
3463
3464         ptlrpc_set_add_req(rqset, req);
3465         RETURN(0);
3466 }
3467
3468 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3469                       __u64 max_age, __u32 flags)
3470 {
3471         struct obd_statfs     *msfs;
3472         struct ptlrpc_request *req;
3473         struct obd_import     *imp = NULL;
3474         int rc;
3475         ENTRY;
3476
3477         /*Since the request might also come from lprocfs, so we need
3478          *sync this with client_disconnect_export Bug15684*/
3479         down_read(&obd->u.cli.cl_sem);
3480         if (obd->u.cli.cl_import)
3481                 imp = class_import_get(obd->u.cli.cl_import);
3482         up_read(&obd->u.cli.cl_sem);
3483         if (!imp)
3484                 RETURN(-ENODEV);
3485
3486         /* We could possibly pass max_age in the request (as an absolute
3487          * timestamp or a "seconds.usec ago") so the target can avoid doing
3488          * extra calls into the filesystem if that isn't necessary (e.g.
3489          * during mount that would help a bit).  Having relative timestamps
3490          * is not so great if request processing is slow, while absolute
3491          * timestamps are not ideal because they need time synchronization. */
3492         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3493
3494         class_import_put(imp);
3495
3496         if (req == NULL)
3497                 RETURN(-ENOMEM);
3498
3499         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3500         if (rc) {
3501                 ptlrpc_request_free(req);
3502                 RETURN(rc);
3503         }
3504         ptlrpc_request_set_replen(req);
3505         req->rq_request_portal = OST_CREATE_PORTAL;
3506         ptlrpc_at_set_req_timeout(req);
3507
3508         if (flags & OBD_STATFS_NODELAY) {
3509                 /* procfs requests not want stat in wait for avoid deadlock */
3510                 req->rq_no_resend = 1;
3511                 req->rq_no_delay = 1;
3512         }
3513
3514         rc = ptlrpc_queue_wait(req);
3515         if (rc)
3516                 GOTO(out, rc);
3517
3518         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3519         if (msfs == NULL) {
3520                 GOTO(out, rc = -EPROTO);
3521         }
3522
3523         *osfs = *msfs;
3524
3525         EXIT;
3526  out:
3527         ptlrpc_req_finished(req);
3528         return rc;
3529 }
3530
3531 /* Retrieve object striping information.
3532  *
3533  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3534  * the maximum number of OST indices which will fit in the user buffer.
3535  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3536  */
3537 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3538 {
3539         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3540         struct lov_user_md_v3 lum, *lumk;
3541         struct lov_user_ost_data_v1 *lmm_objects;
3542         int rc = 0, lum_size;
3543         ENTRY;
3544
3545         if (!lsm)
3546                 RETURN(-ENODATA);
3547
3548         /* we only need the header part from user space to get lmm_magic and
3549          * lmm_stripe_count, (the header part is common to v1 and v3) */
3550         lum_size = sizeof(struct lov_user_md_v1);
3551         if (copy_from_user(&lum, lump, lum_size))
3552                 RETURN(-EFAULT);
3553
3554         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3555             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3556                 RETURN(-EINVAL);
3557
3558         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3559         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3560         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3561         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3562
3563         /* we can use lov_mds_md_size() to compute lum_size
3564          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3565         if (lum.lmm_stripe_count > 0) {
3566                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3567                 OBD_ALLOC(lumk, lum_size);
3568                 if (!lumk)
3569                         RETURN(-ENOMEM);
3570
3571                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3572                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3573                 else
3574                         lmm_objects = &(lumk->lmm_objects[0]);
3575                 lmm_objects->l_object_id = lsm->lsm_object_id;
3576         } else {
3577                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3578                 lumk = &lum;
3579         }
3580
3581         lumk->lmm_object_id = lsm->lsm_object_id;
3582         lumk->lmm_object_gr = lsm->lsm_object_gr;
3583         lumk->lmm_stripe_count = 1;
3584
3585         if (copy_to_user(lump, lumk, lum_size))
3586                 rc = -EFAULT;
3587
3588         if (lumk != &lum)
3589                 OBD_FREE(lumk, lum_size);
3590
3591         RETURN(rc);
3592 }
3593
3594
3595 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3596                          void *karg, void *uarg)
3597 {
3598         struct obd_device *obd = exp->exp_obd;
3599         struct obd_ioctl_data *data = karg;
3600         int err = 0;
3601         ENTRY;
3602
3603         if (!try_module_get(THIS_MODULE)) {
3604                 CERROR("Can't get module. Is it alive?");
3605                 return -EINVAL;
3606         }
3607         switch (cmd) {
3608         case OBD_IOC_LOV_GET_CONFIG: {
3609                 char *buf;
3610                 struct lov_desc *desc;
3611                 struct obd_uuid uuid;
3612
3613                 buf = NULL;
3614                 len = 0;
3615                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3616                         GOTO(out, err = -EINVAL);
3617
3618                 data = (struct obd_ioctl_data *)buf;
3619
3620                 if (sizeof(*desc) > data->ioc_inllen1) {
3621                         obd_ioctl_freedata(buf, len);
3622                         GOTO(out, err = -EINVAL);
3623                 }
3624
3625                 if (data->ioc_inllen2 < sizeof(uuid)) {
3626                         obd_ioctl_freedata(buf, len);
3627                         GOTO(out, err = -EINVAL);
3628                 }
3629
3630                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3631                 desc->ld_tgt_count = 1;
3632                 desc->ld_active_tgt_count = 1;
3633                 desc->ld_default_stripe_count = 1;
3634                 desc->ld_default_stripe_size = 0;
3635                 desc->ld_default_stripe_offset = 0;
3636                 desc->ld_pattern = 0;
3637                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3638
3639                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3640
3641                 err = copy_to_user((void *)uarg, buf, len);
3642                 if (err)
3643                         err = -EFAULT;
3644                 obd_ioctl_freedata(buf, len);
3645                 GOTO(out, err);
3646         }
3647         case LL_IOC_LOV_SETSTRIPE:
3648                 err = obd_alloc_memmd(exp, karg);
3649                 if (err > 0)
3650                         err = 0;
3651                 GOTO(out, err);
3652         case LL_IOC_LOV_GETSTRIPE:
3653                 err = osc_getstripe(karg, uarg);
3654                 GOTO(out, err);
3655         case OBD_IOC_CLIENT_RECOVER:
3656                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3657                                             data->ioc_inlbuf1);
3658                 if (err > 0)
3659                         err = 0;
3660                 GOTO(out, err);
3661         case IOC_OSC_SET_ACTIVE:
3662                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3663                                                data->ioc_offset);
3664                 GOTO(out, err);
3665         case OBD_IOC_POLL_QUOTACHECK:
3666                 err = lquota_poll_check(quota_interface, exp,
3667                                         (struct if_quotacheck *)karg);
3668                 GOTO(out, err);
3669         case OBD_IOC_PING_TARGET:
3670                 err = ptlrpc_obd_ping(obd);
3671                 GOTO(out, err);
3672         default:
3673                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3674                        cmd, cfs_curproc_comm());
3675                 GOTO(out, err = -ENOTTY);
3676         }
3677 out:
3678         module_put(THIS_MODULE);
3679         return err;
3680 }
3681
3682 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3683                         void *key, __u32 *vallen, void *val,
3684                         struct lov_stripe_md *lsm)
3685 {
3686         ENTRY;
3687         if (!vallen || !val)
3688                 RETURN(-EFAULT);
3689
3690         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3691                 __u32 *stripe = val;
3692                 *vallen = sizeof(*stripe);
3693                 *stripe = 0;
3694                 RETURN(0);
3695         } else if (KEY_IS(KEY_LAST_ID)) {
3696                 struct ptlrpc_request *req;
3697                 obd_id                *reply;
3698                 char                  *tmp;
3699                 int                    rc;
3700
3701                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3702                                            &RQF_OST_GET_INFO_LAST_ID);
3703                 if (req == NULL)
3704                         RETURN(-ENOMEM);
3705
3706                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3707                                      RCL_CLIENT, keylen);
3708                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3709                 if (rc) {
3710                         ptlrpc_request_free(req);
3711                         RETURN(rc);
3712                 }
3713
3714                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3715                 memcpy(tmp, key, keylen);
3716
3717                 req->rq_no_delay = req->rq_no_resend = 1;
3718                 ptlrpc_request_set_replen(req);
3719                 rc = ptlrpc_queue_wait(req);
3720                 if (rc)
3721                         GOTO(out, rc);
3722
3723                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3724                 if (reply == NULL)
3725                         GOTO(out, rc = -EPROTO);
3726
3727                 *((obd_id *)val) = *reply;
3728         out:
3729                 ptlrpc_req_finished(req);
3730                 RETURN(rc);
3731         } else if (KEY_IS(KEY_FIEMAP)) {
3732                 struct ptlrpc_request *req;
3733                 struct ll_user_fiemap *reply;
3734                 char *tmp;
3735                 int rc;
3736
3737                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3738                                            &RQF_OST_GET_INFO_FIEMAP);
3739                 if (req == NULL)
3740                         RETURN(-ENOMEM);
3741
3742                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3743                                      RCL_CLIENT, keylen);
3744                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3745                                      RCL_CLIENT, *vallen);
3746                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3747                                      RCL_SERVER, *vallen);
3748
3749                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3750                 if (rc) {
3751                         ptlrpc_request_free(req);
3752                         RETURN(rc);
3753                 }
3754
3755                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3756                 memcpy(tmp, key, keylen);
3757                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3758                 memcpy(tmp, val, *vallen);
3759
3760                 ptlrpc_request_set_replen(req);
3761                 rc = ptlrpc_queue_wait(req);
3762                 if (rc)
3763                         GOTO(out1, rc);
3764
3765                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3766                 if (reply == NULL)
3767                         GOTO(out1, rc = -EPROTO);
3768
3769                 memcpy(val, reply, *vallen);
3770         out1:
3771                 ptlrpc_req_finished(req);
3772
3773                 RETURN(rc);
3774         }
3775
3776         RETURN(-EINVAL);
3777 }
3778
3779 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3780 {
3781         struct llog_ctxt *ctxt;
3782         int rc = 0;
3783         ENTRY;
3784
3785         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3786         if (ctxt) {
3787                 rc = llog_initiator_connect(ctxt);
3788                 llog_ctxt_put(ctxt);
3789         } else {
3790                 /* XXX return an error? skip setting below flags? */
3791         }
3792
3793         spin_lock(&imp->imp_lock);
3794         imp->imp_server_timeout = 1;
3795         imp->imp_pingable = 1;
3796         spin_unlock(&imp->imp_lock);
3797         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3798
3799         RETURN(rc);
3800 }
3801
3802 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3803                                           struct ptlrpc_request *req,
3804                                           void *aa, int rc)
3805 {
3806         ENTRY;
3807         if (rc != 0)
3808                 RETURN(rc);
3809
3810         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3811 }
3812
3813 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3814                               void *key, obd_count vallen, void *val,
3815                               struct ptlrpc_request_set *set)
3816 {
3817         struct ptlrpc_request *req;
3818         struct obd_device     *obd = exp->exp_obd;
3819         struct obd_import     *imp = class_exp2cliimp(exp);
3820         char                  *tmp;
3821         int                    rc;
3822         ENTRY;
3823
3824         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3825
3826         if (KEY_IS(KEY_NEXT_ID)) {
3827                 if (vallen != sizeof(obd_id))
3828                         RETURN(-ERANGE);
3829                 if (val == NULL)
3830                         RETURN(-EINVAL);
3831                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3832                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3833                        exp->exp_obd->obd_name,
3834                        obd->u.cli.cl_oscc.oscc_next_id);
3835
3836                 RETURN(0);
3837         }
3838
3839         if (KEY_IS(KEY_UNLINKED)) {
3840                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3841                 spin_lock(&oscc->oscc_lock);
3842                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3843                 spin_unlock(&oscc->oscc_lock);
3844                 RETURN(0);
3845         }
3846
3847         if (KEY_IS(KEY_INIT_RECOV)) {
3848                 if (vallen != sizeof(int))
3849                         RETURN(-EINVAL);
3850                 spin_lock(&imp->imp_lock);
3851                 imp->imp_initial_recov = *(int *)val;
3852                 spin_unlock(&imp->imp_lock);
3853                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3854                        exp->exp_obd->obd_name,
3855                        imp->imp_initial_recov);
3856                 RETURN(0);
3857         }
3858
3859         if (KEY_IS(KEY_CHECKSUM)) {
3860                 if (vallen != sizeof(int))
3861                         RETURN(-EINVAL);
3862                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3863                 RETURN(0);
3864         }
3865
3866         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3867                 sptlrpc_conf_client_adapt(obd);
3868                 RETURN(0);
3869         }
3870
3871         if (KEY_IS(KEY_FLUSH_CTX)) {
3872                 sptlrpc_import_flush_my_ctx(imp);
3873                 RETURN(0);
3874         }
3875
3876         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3877                 RETURN(-EINVAL);
3878
3879         /* We pass all other commands directly to OST. Since nobody calls osc
3880            methods directly and everybody is supposed to go through LOV, we
3881            assume lov checked invalid values for us.
3882            The only recognised values so far are evict_by_nid and mds_conn.
3883            Even if something bad goes through, we'd get a -EINVAL from OST
3884            anyway. */
3885
3886         if (KEY_IS(KEY_GRANT_SHRINK))
3887                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3888         else
3889                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3890
3891         if (req == NULL)
3892                 RETURN(-ENOMEM);
3893
3894         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3895                              RCL_CLIENT, keylen);
3896         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3897                              RCL_CLIENT, vallen);
3898         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3899         if (rc) {
3900                 ptlrpc_request_free(req);
3901                 RETURN(rc);
3902         }
3903
3904         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3905         memcpy(tmp, key, keylen);
3906         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3907         memcpy(tmp, val, vallen);
3908
3909         if (KEY_IS(KEY_MDS_CONN)) {
3910                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3911
3912                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3913                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3914                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3915                 req->rq_no_delay = req->rq_no_resend = 1;
3916                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3917         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3918                 struct osc_grant_args *aa;
3919                 struct obdo *oa;
3920
3921                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3922                 aa = ptlrpc_req_async_args(req);
3923                 OBD_ALLOC_PTR(oa);
3924                 if (!oa) {
3925                         ptlrpc_req_finished(req);
3926                         RETURN(-ENOMEM);
3927                 }
3928                 *oa = ((struct ost_body *)val)->oa;
3929                 aa->aa_oa = oa;
3930                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3931         }
3932
3933         ptlrpc_request_set_replen(req);
3934         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3935                 LASSERT(set != NULL);
3936                 ptlrpc_set_add_req(set, req);
3937                 ptlrpc_check_set(NULL, set);
3938         } else
3939                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3940
3941         RETURN(0);
3942 }
3943
3944
3945 static struct llog_operations osc_size_repl_logops = {
3946         lop_cancel: llog_obd_repl_cancel
3947 };
3948
3949 static struct llog_operations osc_mds_ost_orig_logops;
3950 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3951                          struct obd_device *tgt, int count,
3952                          struct llog_catid *catid, struct obd_uuid *uuid)
3953 {
3954         int rc;
3955         ENTRY;
3956
3957         LASSERT(olg == &obd->obd_olg);
3958         spin_lock(&obd->obd_dev_lock);
3959         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3960                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3961                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3962                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3963                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3964                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3965         }
3966         spin_unlock(&obd->obd_dev_lock);
3967
3968         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3969                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3970         if (rc) {
3971                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3972                 GOTO(out, rc);
3973         }
3974
3975         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3976                         NULL, &osc_size_repl_logops);
3977         if (rc) {
3978                 struct llog_ctxt *ctxt =
3979                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3980                 if (ctxt)
3981                         llog_cleanup(ctxt);
3982                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3983         }
3984         GOTO(out, rc);
3985 out:
3986         if (rc) {
3987                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3988                        obd->obd_name, tgt->obd_name, count, catid, rc);
3989                 CERROR("logid "LPX64":0x%x\n",
3990                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3991         }
3992         return rc;
3993 }
3994
3995 static int osc_llog_finish(struct obd_device *obd, int count)
3996 {
3997         struct llog_ctxt *ctxt;
3998         int rc = 0, rc2 = 0;
3999         ENTRY;
4000
4001         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4002         if (ctxt)
4003                 rc = llog_cleanup(ctxt);
4004
4005         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4006         if (ctxt)
4007                 rc2 = llog_cleanup(ctxt);
4008         if (!rc)
4009                 rc = rc2;
4010
4011         RETURN(rc);
4012 }
4013
4014 static int osc_reconnect(const struct lu_env *env,
4015                          struct obd_export *exp, struct obd_device *obd,
4016                          struct obd_uuid *cluuid,
4017                          struct obd_connect_data *data,
4018                          void *localdata)
4019 {
4020         struct client_obd *cli = &obd->u.cli;
4021
4022         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4023                 long lost_grant;
4024
4025                 client_obd_list_lock(&cli->cl_loi_list_lock);
4026                 data->ocd_grant = cli->cl_avail_grant ?:
4027                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4028                 lost_grant = cli->cl_lost_grant;
4029                 cli->cl_lost_grant = 0;
4030                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4031
4032                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4033                        "cl_lost_grant: %ld\n", data->ocd_grant,
4034                        cli->cl_avail_grant, lost_grant);
4035                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4036                        " ocd_grant: %d\n", data->ocd_connect_flags,
4037                        data->ocd_version, data->ocd_grant);
4038         }
4039
4040         RETURN(0);
4041 }
4042
4043 static int osc_disconnect(struct obd_export *exp)
4044 {
4045         struct obd_device *obd = class_exp2obd(exp);
4046         struct llog_ctxt  *ctxt;
4047         int rc;
4048
4049         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4050         if (ctxt) {
4051                 if (obd->u.cli.cl_conn_count == 1) {
4052                         /* Flush any remaining cancel messages out to the
4053                          * target */
4054                         llog_sync(ctxt, exp);
4055                 }
4056                 llog_ctxt_put(ctxt);
4057         } else {
4058                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4059                        obd);
4060         }
4061
4062         rc = client_disconnect_export(exp);
4063         /**
4064          * Initially we put del_shrink_grant before disconnect_export, but it
4065          * causes the following problem if setup (connect) and cleanup
4066          * (disconnect) are tangled together.
4067          *      connect p1                     disconnect p2
4068          *   ptlrpc_connect_import
4069          *     ...............               class_manual_cleanup
4070          *                                     osc_disconnect
4071          *                                     del_shrink_grant
4072          *   ptlrpc_connect_interrupt
4073          *     init_grant_shrink
4074          *   add this client to shrink list
4075          *                                      cleanup_osc
4076          * Bang! pinger trigger the shrink.
4077          * So the osc should be disconnected from the shrink list, after we
4078          * are sure the import has been destroyed. BUG18662
4079          */
4080         if (obd->u.cli.cl_import == NULL)
4081                 osc_del_shrink_grant(&obd->u.cli);
4082         return rc;
4083 }
4084
4085 static int osc_import_event(struct obd_device *obd,
4086                             struct obd_import *imp,
4087                             enum obd_import_event event)
4088 {
4089         struct client_obd *cli;
4090         int rc = 0;
4091
4092         ENTRY;
4093         LASSERT(imp->imp_obd == obd);
4094
4095         switch (event) {
4096         case IMP_EVENT_DISCON: {
4097                 /* Only do this on the MDS OSC's */
4098                 if (imp->imp_server_timeout) {
4099                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4100
4101                         spin_lock(&oscc->oscc_lock);
4102                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4103                         spin_unlock(&oscc->oscc_lock);
4104                 }
4105                 cli = &obd->u.cli;
4106                 client_obd_list_lock(&cli->cl_loi_list_lock);
4107                 cli->cl_avail_grant = 0;
4108                 cli->cl_lost_grant = 0;
4109                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4110                 break;
4111         }
4112         case IMP_EVENT_INACTIVE: {
4113                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4114                 break;
4115         }
4116         case IMP_EVENT_INVALIDATE: {
4117                 struct ldlm_namespace *ns = obd->obd_namespace;
4118                 struct lu_env         *env;
4119                 int                    refcheck;
4120
4121                 env = cl_env_get(&refcheck);
4122                 if (!IS_ERR(env)) {
4123                         /* Reset grants */
4124                         cli = &obd->u.cli;
4125                         client_obd_list_lock(&cli->cl_loi_list_lock);
4126                         /* all pages go to failing rpcs due to the invalid
4127                          * import */
4128                         osc_check_rpcs(env, cli);
4129                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4130
4131                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4132                         cl_env_put(env, &refcheck);
4133                 } else
4134                         rc = PTR_ERR(env);
4135                 break;
4136         }
4137         case IMP_EVENT_ACTIVE: {
4138                 /* Only do this on the MDS OSC's */
4139                 if (imp->imp_server_timeout) {
4140                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4141
4142                         spin_lock(&oscc->oscc_lock);
4143                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4144                         spin_unlock(&oscc->oscc_lock);
4145                 }
4146                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4147                 break;
4148         }
4149         case IMP_EVENT_OCD: {
4150                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4151
4152                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4153                         osc_init_grant(&obd->u.cli, ocd);
4154
4155                 /* See bug 7198 */
4156                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4157                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4158
4159                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4160                 break;
4161         }
4162         default:
4163                 CERROR("Unknown import event %d\n", event);
4164                 LBUG();
4165         }
4166         RETURN(rc);
4167 }
4168
4169 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4170 {
4171         int rc;
4172         ENTRY;
4173
4174         ENTRY;
4175         rc = ptlrpcd_addref();
4176         if (rc)
4177                 RETURN(rc);
4178
4179         rc = client_obd_setup(obd, lcfg);
4180         if (rc) {
4181                 ptlrpcd_decref();
4182         } else {
4183                 struct lprocfs_static_vars lvars = { 0 };
4184                 struct client_obd *cli = &obd->u.cli;
4185
4186                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4187                 lprocfs_osc_init_vars(&lvars);
4188                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4189                         lproc_osc_attach_seqstat(obd);
4190                         sptlrpc_lprocfs_cliobd_attach(obd);
4191                         ptlrpc_lprocfs_register_obd(obd);
4192                 }
4193
4194                 oscc_init(obd);
4195                 /* We need to allocate a few requests more, because
4196                    brw_interpret tries to create new requests before freeing
4197                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4198                    reserved, but I afraid that might be too much wasted RAM
4199                    in fact, so 2 is just my guess and still should work. */
4200                 cli->cl_import->imp_rq_pool =
4201                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4202                                             OST_MAXREQSIZE,
4203                                             ptlrpc_add_rqs_to_pool);
4204
4205                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4206                 sema_init(&cli->cl_grant_sem, 1);
4207         }
4208
4209         RETURN(rc);
4210 }
4211
4212 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4213 {
4214         int rc = 0;
4215         ENTRY;
4216
4217         switch (stage) {
4218         case OBD_CLEANUP_EARLY: {
4219                 struct obd_import *imp;
4220                 imp = obd->u.cli.cl_import;
4221                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4222                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4223                 ptlrpc_deactivate_import(imp);
4224                 spin_lock(&imp->imp_lock);
4225                 imp->imp_pingable = 0;
4226                 spin_unlock(&imp->imp_lock);
4227                 break;
4228         }
4229         case OBD_CLEANUP_EXPORTS: {
4230                 /* If we set up but never connected, the
4231                    client import will not have been cleaned. */
4232                 if (obd->u.cli.cl_import) {
4233                         struct obd_import *imp;
4234                         down_write(&obd->u.cli.cl_sem);
4235                         imp = obd->u.cli.cl_import;
4236                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4237                                obd->obd_name);
4238                         ptlrpc_invalidate_import(imp);
4239                         if (imp->imp_rq_pool) {
4240                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4241                                 imp->imp_rq_pool = NULL;
4242                         }
4243                         class_destroy_import(imp);
4244                         up_write(&obd->u.cli.cl_sem);
4245                         obd->u.cli.cl_import = NULL;
4246                 }
4247                 rc = obd_llog_finish(obd, 0);
4248                 if (rc != 0)
4249                         CERROR("failed to cleanup llogging subsystems\n");
4250                 break;
4251                 }
4252         }
4253         RETURN(rc);
4254 }
4255
4256 int osc_cleanup(struct obd_device *obd)
4257 {
4258         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4259         int rc;
4260
4261         ENTRY;
4262         ptlrpc_lprocfs_unregister_obd(obd);
4263         lprocfs_obd_cleanup(obd);
4264
4265         spin_lock(&oscc->oscc_lock);
4266         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4267         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4268         spin_unlock(&oscc->oscc_lock);
4269
4270         /* free memory of osc quota cache */
4271         lquota_cleanup(quota_interface, obd);
4272
4273         rc = client_obd_cleanup(obd);
4274
4275         ptlrpcd_decref();
4276         RETURN(rc);
4277 }
4278
4279 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4280 {
4281         struct lprocfs_static_vars lvars = { 0 };
4282         int rc = 0;
4283
4284         lprocfs_osc_init_vars(&lvars);
4285
4286         switch (lcfg->lcfg_command) {
4287         default:
4288                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4289                                               lcfg, obd);
4290                 if (rc > 0)
4291                         rc = 0;
4292                 break;
4293         }
4294
4295         return(rc);
4296 }
4297
4298 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4299 {
4300         return osc_process_config_base(obd, buf);
4301 }
4302
4303 struct obd_ops osc_obd_ops = {
4304         .o_owner                = THIS_MODULE,
4305         .o_setup                = osc_setup,
4306         .o_precleanup           = osc_precleanup,
4307         .o_cleanup              = osc_cleanup,
4308         .o_add_conn             = client_import_add_conn,
4309         .o_del_conn             = client_import_del_conn,
4310         .o_connect              = client_connect_import,
4311         .o_reconnect            = osc_reconnect,
4312         .o_disconnect           = osc_disconnect,
4313         .o_statfs               = osc_statfs,
4314         .o_statfs_async         = osc_statfs_async,
4315         .o_packmd               = osc_packmd,
4316         .o_unpackmd             = osc_unpackmd,
4317         .o_precreate            = osc_precreate,
4318         .o_create               = osc_create,
4319         .o_destroy              = osc_destroy,
4320         .o_getattr              = osc_getattr,
4321         .o_getattr_async        = osc_getattr_async,
4322         .o_setattr              = osc_setattr,
4323         .o_setattr_async        = osc_setattr_async,
4324         .o_brw                  = osc_brw,
4325         .o_punch                = osc_punch,
4326         .o_sync                 = osc_sync,
4327         .o_enqueue              = osc_enqueue,
4328         .o_change_cbdata        = osc_change_cbdata,
4329         .o_cancel               = osc_cancel,
4330         .o_cancel_unused        = osc_cancel_unused,
4331         .o_iocontrol            = osc_iocontrol,
4332         .o_get_info             = osc_get_info,
4333         .o_set_info_async       = osc_set_info_async,
4334         .o_import_event         = osc_import_event,
4335         .o_llog_init            = osc_llog_init,
4336         .o_llog_finish          = osc_llog_finish,
4337         .o_process_config       = osc_process_config,
4338 };
4339
4340 extern struct lu_kmem_descr  osc_caches[];
4341 extern spinlock_t            osc_ast_guard;
4342 extern struct lock_class_key osc_ast_guard_class;
4343
4344 int __init osc_init(void)
4345 {
4346         struct lprocfs_static_vars lvars = { 0 };
4347         int rc;
4348         ENTRY;
4349
4350         /* print an address of _any_ initialized kernel symbol from this
4351          * module, to allow debugging with gdb that doesn't support data
4352          * symbols from modules.*/
4353         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4354
4355         rc = lu_kmem_init(osc_caches);
4356
4357         lprocfs_osc_init_vars(&lvars);
4358
4359         request_module("lquota");
4360         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4361         lquota_init(quota_interface);
4362         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4363
4364         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4365                                  LUSTRE_OSC_NAME, &osc_device_type);
4366         if (rc) {
4367                 if (quota_interface)
4368                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4369                 lu_kmem_fini(osc_caches);
4370                 RETURN(rc);
4371         }
4372
4373         spin_lock_init(&osc_ast_guard);
4374         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4375
4376         RETURN(rc);
4377 }
4378
4379 #ifdef __KERNEL__
4380 static void /*__exit*/ osc_exit(void)
4381 {
4382         lu_device_type_fini(&osc_device_type);
4383
4384         lquota_exit(quota_interface);
4385         if (quota_interface)
4386                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4387
4388         class_unregister_type(LUSTRE_OSC_NAME);
4389         lu_kmem_fini(osc_caches);
4390 }
4391
4392 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4393 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4394 MODULE_LICENSE("GPL");
4395
4396 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4397 #endif