Whamcloud - gitweb
b=18881
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         lustre_set_wire_obdo(&body->oa, oa);
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         lustre_get_wire_obdo(oa, &body->oa);
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         lustre_get_wire_obdo(aa->pa_oa, &body->oa);
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         lustre_set_wire_obdo(&body->oa, oa);
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         lustre_set_wire_obdo(&body->oa, oa);
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         lustre_get_wire_obdo(oa, &body->oa);
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         lustre_set_wire_obdo(&body->oa, oa);
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
747                                                           NULL);
748
749                         /*
750                          * Wait until the number of on-going destroy RPCs drops
751                          * under max_rpc_in_flight
752                          */
753                         l_wait_event_exclusive(cli->cl_destroy_waitq,
754                                                osc_can_send_destroy(cli), &lwi);
755                 }
756         }
757
758         /* Do not wait for response */
759         ptlrpcd_add_req(req, PSCOPE_OTHER);
760         RETURN(0);
761 }
762
763 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
764                                 long writing_bytes)
765 {
766         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
767
768         LASSERT(!(oa->o_valid & bits));
769
770         oa->o_valid |= bits;
771         client_obd_list_lock(&cli->cl_loi_list_lock);
772         oa->o_dirty = cli->cl_dirty;
773         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
774                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
775                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
776                 oa->o_undirty = 0;
777         } else if (atomic_read(&obd_dirty_pages) -
778                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
779                 CERROR("dirty %d - %d > system dirty_max %d\n",
780                        atomic_read(&obd_dirty_pages),
781                        atomic_read(&obd_dirty_transit_pages),
782                        obd_max_dirty_pages);
783                 oa->o_undirty = 0;
784         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
785                 CERROR("dirty %lu - dirty_max %lu too big???\n",
786                        cli->cl_dirty, cli->cl_dirty_max);
787                 oa->o_undirty = 0;
788         } else {
789                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
790                                 (cli->cl_max_rpcs_in_flight + 1);
791                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
792         }
793         oa->o_grant = cli->cl_avail_grant;
794         oa->o_dropped = cli->cl_lost_grant;
795         cli->cl_lost_grant = 0;
796         client_obd_list_unlock(&cli->cl_loi_list_lock);
797         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
798                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
799
800 }
801
802 static void osc_update_next_shrink(struct client_obd *cli)
803 {
804         cli->cl_next_shrink_grant =
805                 cfs_time_shift(cli->cl_grant_shrink_interval);
806         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
807                cli->cl_next_shrink_grant);
808 }
809
810 /* caller must hold loi_list_lock */
811 static void osc_consume_write_grant(struct client_obd *cli,
812                                     struct brw_page *pga)
813 {
814         LASSERT(client_obd_list_is_locked(&cli->cl_loi_list_lock));
815         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
816         atomic_inc(&obd_dirty_pages);
817         cli->cl_dirty += CFS_PAGE_SIZE;
818         cli->cl_avail_grant -= CFS_PAGE_SIZE;
819         pga->flag |= OBD_BRW_FROM_GRANT;
820         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
821                CFS_PAGE_SIZE, pga, pga->pg);
822         LASSERT(cli->cl_avail_grant >= 0);
823         osc_update_next_shrink(cli);
824 }
825
826 /* the companion to osc_consume_write_grant, called when a brw has completed.
827  * must be called with the loi lock held. */
828 static void osc_release_write_grant(struct client_obd *cli,
829                                     struct brw_page *pga, int sent)
830 {
831         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
832         ENTRY;
833
834         LASSERT(client_obd_list_is_locked(&cli->cl_loi_list_lock));
835         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
836                 EXIT;
837                 return;
838         }
839
840         pga->flag &= ~OBD_BRW_FROM_GRANT;
841         atomic_dec(&obd_dirty_pages);
842         cli->cl_dirty -= CFS_PAGE_SIZE;
843         if (pga->flag & OBD_BRW_NOCACHE) {
844                 pga->flag &= ~OBD_BRW_NOCACHE;
845                 atomic_dec(&obd_dirty_transit_pages);
846                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
847         }
848         if (!sent) {
849                 cli->cl_lost_grant += CFS_PAGE_SIZE;
850                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
851                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
852         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
853                 /* For short writes we shouldn't count parts of pages that
854                  * span a whole block on the OST side, or our accounting goes
855                  * wrong.  Should match the code in filter_grant_check. */
856                 int offset = pga->off & ~CFS_PAGE_MASK;
857                 int count = pga->count + (offset & (blocksize - 1));
858                 int end = (offset + pga->count) & (blocksize - 1);
859                 if (end)
860                         count += blocksize - end;
861
862                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
863                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
864                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
865                        cli->cl_avail_grant, cli->cl_dirty);
866         }
867
868         EXIT;
869 }
870
871 static unsigned long rpcs_in_flight(struct client_obd *cli)
872 {
873         return cli->cl_r_in_flight + cli->cl_w_in_flight;
874 }
875
876 /* caller must hold loi_list_lock */
877 void osc_wake_cache_waiters(struct client_obd *cli)
878 {
879         struct list_head *l, *tmp;
880         struct osc_cache_waiter *ocw;
881
882         ENTRY;
883         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
884                 /* if we can't dirty more, we must wait until some is written */
885                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
886                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
887                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
888                                "osc max %ld, sys max %d\n", cli->cl_dirty,
889                                cli->cl_dirty_max, obd_max_dirty_pages);
890                         return;
891                 }
892
893                 /* if still dirty cache but no grant wait for pending RPCs that
894                  * may yet return us some grant before doing sync writes */
895                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
896                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
897                                cli->cl_w_in_flight);
898                         return;
899                 }
900
901                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
902                 list_del_init(&ocw->ocw_entry);
903                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
904                         /* no more RPCs in flight to return grant, do sync IO */
905                         ocw->ocw_rc = -EDQUOT;
906                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
907                 } else {
908                         osc_consume_write_grant(cli,
909                                                 &ocw->ocw_oap->oap_brw_page);
910                 }
911
912                 cfs_waitq_signal(&ocw->ocw_waitq);
913         }
914
915         EXIT;
916 }
917
918 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
919 {
920         client_obd_list_lock(&cli->cl_loi_list_lock);
921         cli->cl_avail_grant += grant;
922         client_obd_list_unlock(&cli->cl_loi_list_lock);
923 }
924
925 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
926 {
927         if (body->oa.o_valid & OBD_MD_FLGRANT) {
928                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
929                 __osc_update_grant(cli, body->oa.o_grant);
930         }
931 }
932
933 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
934                               void *key, obd_count vallen, void *val,
935                               struct ptlrpc_request_set *set);
936
937 static int osc_shrink_grant_interpret(const struct lu_env *env,
938                                       struct ptlrpc_request *req,
939                                       void *aa, int rc)
940 {
941         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
942         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
943         struct ost_body *body;
944
945         if (rc != 0) {
946                 __osc_update_grant(cli, oa->o_grant);
947                 GOTO(out, rc);
948         }
949
950         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
951         LASSERT(body);
952         osc_update_grant(cli, body);
953 out:
954         OBD_FREE_PTR(oa);
955         return rc;
956 }
957
958 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
959 {
960         client_obd_list_lock(&cli->cl_loi_list_lock);
961         oa->o_grant = cli->cl_avail_grant / 4;
962         cli->cl_avail_grant -= oa->o_grant;
963         client_obd_list_unlock(&cli->cl_loi_list_lock);
964         oa->o_flags |= OBD_FL_SHRINK_GRANT;
965         osc_update_next_shrink(cli);
966 }
967
968 /* Shrink the current grant, either from some large amount to enough for a
969  * full set of in-flight RPCs, or if we have already shrunk to that limit
970  * then to enough for a single RPC.  This avoids keeping more grant than
971  * needed, and avoids shrinking the grant piecemeal. */
972 static int osc_shrink_grant(struct client_obd *cli)
973 {
974         long target = (cli->cl_max_rpcs_in_flight + 1) *
975                       cli->cl_max_pages_per_rpc;
976
977         client_obd_list_lock(&cli->cl_loi_list_lock);
978         if (cli->cl_avail_grant <= target)
979                 target = cli->cl_max_pages_per_rpc;
980         client_obd_list_unlock(&cli->cl_loi_list_lock);
981
982         return osc_shrink_grant_to_target(cli, target);
983 }
984
985 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
986 {
987         int    rc = 0;
988         struct ost_body     *body;
989         ENTRY;
990
991         client_obd_list_lock(&cli->cl_loi_list_lock);
992         /* Don't shrink if we are already above or below the desired limit
993          * We don't want to shrink below a single RPC, as that will negatively
994          * impact block allocation and long-term performance. */
995         if (target < cli->cl_max_pages_per_rpc)
996                 target = cli->cl_max_pages_per_rpc;
997
998         if (target >= cli->cl_avail_grant) {
999                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1000                 RETURN(0);
1001         }
1002         client_obd_list_unlock(&cli->cl_loi_list_lock);
1003
1004         OBD_ALLOC_PTR(body);
1005         if (!body)
1006                 RETURN(-ENOMEM);
1007
1008         osc_announce_cached(cli, &body->oa, 0);
1009
1010         client_obd_list_lock(&cli->cl_loi_list_lock);
1011         body->oa.o_grant = cli->cl_avail_grant - target;
1012         cli->cl_avail_grant = target;
1013         client_obd_list_unlock(&cli->cl_loi_list_lock);
1014         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1015         osc_update_next_shrink(cli);
1016
1017         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1018                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1019                                 sizeof(*body), body, NULL);
1020         if (rc != 0)
1021                 __osc_update_grant(cli, body->oa.o_grant);
1022         OBD_FREE_PTR(body);
1023         RETURN(rc);
1024 }
1025
1026 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1027 static int osc_should_shrink_grant(struct client_obd *client)
1028 {
1029         cfs_time_t time = cfs_time_current();
1030         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1031         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1032                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1033                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1034                         return 1;
1035                 else
1036                         osc_update_next_shrink(client);
1037         }
1038         return 0;
1039 }
1040
1041 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1042 {
1043         struct client_obd *client;
1044
1045         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1046                 if (osc_should_shrink_grant(client))
1047                         osc_shrink_grant(client);
1048         }
1049         return 0;
1050 }
1051
1052 static int osc_add_shrink_grant(struct client_obd *client)
1053 {
1054         int rc;
1055
1056         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1057                                        TIMEOUT_GRANT,
1058                                        osc_grant_shrink_grant_cb, NULL,
1059                                        &client->cl_grant_shrink_list);
1060         if (rc) {
1061                 CERROR("add grant client %s error %d\n",
1062                         client->cl_import->imp_obd->obd_name, rc);
1063                 return rc;
1064         }
1065         CDEBUG(D_CACHE, "add grant client %s \n",
1066                client->cl_import->imp_obd->obd_name);
1067         osc_update_next_shrink(client);
1068         return 0;
1069 }
1070
1071 static int osc_del_shrink_grant(struct client_obd *client)
1072 {
1073         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1074                                          TIMEOUT_GRANT);
1075 }
1076
1077 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1078 {
1079         client_obd_list_lock(&cli->cl_loi_list_lock);
1080         cli->cl_avail_grant = ocd->ocd_grant;
1081         client_obd_list_unlock(&cli->cl_loi_list_lock);
1082
1083         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1084             list_empty(&cli->cl_grant_shrink_list))
1085                 osc_add_shrink_grant(cli);
1086
1087         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1088                cli->cl_avail_grant, cli->cl_lost_grant);
1089         LASSERT(cli->cl_avail_grant >= 0);
1090 }
1091
1092 /* We assume that the reason this OSC got a short read is because it read
1093  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1094  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1095  * this stripe never got written at or beyond this stripe offset yet. */
1096 static void handle_short_read(int nob_read, obd_count page_count,
1097                               struct brw_page **pga)
1098 {
1099         char *ptr;
1100         int i = 0;
1101
1102         /* skip bytes read OK */
1103         while (nob_read > 0) {
1104                 LASSERT (page_count > 0);
1105
1106                 if (pga[i]->count > nob_read) {
1107                         /* EOF inside this page */
1108                         ptr = cfs_kmap(pga[i]->pg) +
1109                                 (pga[i]->off & ~CFS_PAGE_MASK);
1110                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1111                         cfs_kunmap(pga[i]->pg);
1112                         page_count--;
1113                         i++;
1114                         break;
1115                 }
1116
1117                 nob_read -= pga[i]->count;
1118                 page_count--;
1119                 i++;
1120         }
1121
1122         /* zero remaining pages */
1123         while (page_count-- > 0) {
1124                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1125                 memset(ptr, 0, pga[i]->count);
1126                 cfs_kunmap(pga[i]->pg);
1127                 i++;
1128         }
1129 }
1130
1131 static int check_write_rcs(struct ptlrpc_request *req,
1132                            int requested_nob, int niocount,
1133                            obd_count page_count, struct brw_page **pga)
1134 {
1135         int    *remote_rcs, i;
1136
1137         /* return error if any niobuf was in error */
1138         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1139                                         sizeof(*remote_rcs) * niocount, NULL);
1140         if (remote_rcs == NULL) {
1141                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1142                 return(-EPROTO);
1143         }
1144         if (lustre_msg_swabbed(req->rq_repmsg))
1145                 for (i = 0; i < niocount; i++)
1146                         __swab32s(&remote_rcs[i]);
1147
1148         for (i = 0; i < niocount; i++) {
1149                 if (remote_rcs[i] < 0)
1150                         return(remote_rcs[i]);
1151
1152                 if (remote_rcs[i] != 0) {
1153                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1154                                 i, remote_rcs[i], req);
1155                         return(-EPROTO);
1156                 }
1157         }
1158
1159         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1160                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1161                        req->rq_bulk->bd_nob_transferred, requested_nob);
1162                 return(-EPROTO);
1163         }
1164
1165         return (0);
1166 }
1167
1168 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1169 {
1170         if (p1->flag != p2->flag) {
1171                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1172                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1173
1174                 /* warn if we try to combine flags that we don't know to be
1175                  * safe to combine */
1176                 if ((p1->flag & mask) != (p2->flag & mask))
1177                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1178                                "same brw?\n", p1->flag, p2->flag);
1179                 return 0;
1180         }
1181
1182         return (p1->off + p1->count == p2->off);
1183 }
1184
1185 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1186                                    struct brw_page **pga, int opc,
1187                                    cksum_type_t cksum_type)
1188 {
1189         __u32 cksum;
1190         int i = 0;
1191
1192         LASSERT (pg_count > 0);
1193         cksum = init_checksum(cksum_type);
1194         while (nob > 0 && pg_count > 0) {
1195                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1196                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1197                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1198
1199                 /* corrupt the data before we compute the checksum, to
1200                  * simulate an OST->client data error */
1201                 if (i == 0 && opc == OST_READ &&
1202                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1203                         memcpy(ptr + off, "bad1", min(4, nob));
1204                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1205                 cfs_kunmap(pga[i]->pg);
1206                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1207                                off, cksum);
1208
1209                 nob -= pga[i]->count;
1210                 pg_count--;
1211                 i++;
1212         }
1213         /* For sending we only compute the wrong checksum instead
1214          * of corrupting the data so it is still correct on a redo */
1215         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216                 cksum++;
1217
1218         return cksum;
1219 }
1220
1221 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1222                                 struct lov_stripe_md *lsm, obd_count page_count,
1223                                 struct brw_page **pga,
1224                                 struct ptlrpc_request **reqp,
1225                                 struct obd_capa *ocapa, int reserve)
1226 {
1227         struct ptlrpc_request   *req;
1228         struct ptlrpc_bulk_desc *desc;
1229         struct ost_body         *body;
1230         struct obd_ioobj        *ioobj;
1231         struct niobuf_remote    *niobuf;
1232         int niocount, i, requested_nob, opc, rc;
1233         struct osc_brw_async_args *aa;
1234         struct req_capsule      *pill;
1235         struct brw_page *pg_prev;
1236
1237         ENTRY;
1238         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1239                 RETURN(-ENOMEM); /* Recoverable */
1240         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1241                 RETURN(-EINVAL); /* Fatal */
1242
1243         if ((cmd & OBD_BRW_WRITE) != 0) {
1244                 opc = OST_WRITE;
1245                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1246                                                 cli->cl_import->imp_rq_pool,
1247                                                 &RQF_OST_BRW);
1248         } else {
1249                 opc = OST_READ;
1250                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1251         }
1252         if (req == NULL)
1253                 RETURN(-ENOMEM);
1254
1255         for (niocount = i = 1; i < page_count; i++) {
1256                 if (!can_merge_pages(pga[i - 1], pga[i]))
1257                         niocount++;
1258         }
1259
1260         pill = &req->rq_pill;
1261         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1262                              niocount * sizeof(*niobuf));
1263         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1264
1265         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1266         if (rc) {
1267                 ptlrpc_request_free(req);
1268                 RETURN(rc);
1269         }
1270         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1271         ptlrpc_at_set_req_timeout(req);
1272
1273         if (opc == OST_WRITE)
1274                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1275                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1276         else
1277                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1278                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1279
1280         if (desc == NULL)
1281                 GOTO(out, rc = -ENOMEM);
1282         /* NB request now owns desc and will free it when it gets freed */
1283
1284         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1285         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1286         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1287         LASSERT(body && ioobj && niobuf);
1288
1289         lustre_set_wire_obdo(&body->oa, oa);
1290
1291         obdo_to_ioobj(oa, ioobj);
1292         ioobj->ioo_bufcnt = niocount;
1293         osc_pack_capa(req, body, ocapa);
1294         LASSERT (page_count > 0);
1295         pg_prev = pga[0];
1296         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1297                 struct brw_page *pg = pga[i];
1298
1299                 LASSERT(pg->count > 0);
1300                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1301                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1302                          pg->off, pg->count);
1303 #ifdef __linux__
1304                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1305                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1306                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1307                          i, page_count,
1308                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1309                          pg_prev->pg, page_private(pg_prev->pg),
1310                          pg_prev->pg->index, pg_prev->off);
1311 #else
1312                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1313                          "i %d p_c %u\n", i, page_count);
1314 #endif
1315                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1316                         (pg->flag & OBD_BRW_SRVLOCK));
1317
1318                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1319                                       pg->count);
1320                 requested_nob += pg->count;
1321
1322                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1323                         niobuf--;
1324                         niobuf->len += pg->count;
1325                 } else {
1326                         niobuf->offset = pg->off;
1327                         niobuf->len    = pg->count;
1328                         niobuf->flags  = pg->flag;
1329                 }
1330                 pg_prev = pg;
1331         }
1332
1333         LASSERTF((void *)(niobuf - niocount) ==
1334                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1335                                niocount * sizeof(*niobuf)),
1336                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1337                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1338                 (void *)(niobuf - niocount));
1339
1340         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1341         if (osc_should_shrink_grant(cli))
1342                 osc_shrink_grant_local(cli, &body->oa);
1343
1344         /* size[REQ_REC_OFF] still sizeof (*body) */
1345         if (opc == OST_WRITE) {
1346                 if (unlikely(cli->cl_checksum) &&
1347                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1348                         /* store cl_cksum_type in a local variable since
1349                          * it can be changed via lprocfs */
1350                         cksum_type_t cksum_type = cli->cl_cksum_type;
1351
1352                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1353                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1354                                 body->oa.o_flags = 0;
1355                         }
1356                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1357                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1358                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1359                                                              page_count, pga,
1360                                                              OST_WRITE,
1361                                                              cksum_type);
1362                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1363                                body->oa.o_cksum);
1364                         /* save this in 'oa', too, for later checking */
1365                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1366                         oa->o_flags |= cksum_type_pack(cksum_type);
1367                 } else {
1368                         /* clear out the checksum flag, in case this is a
1369                          * resend but cl_checksum is no longer set. b=11238 */
1370                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1371                 }
1372                 oa->o_cksum = body->oa.o_cksum;
1373                 /* 1 RC per niobuf */
1374                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1375                                      sizeof(__u32) * niocount);
1376         } else {
1377                 if (unlikely(cli->cl_checksum) &&
1378                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1379                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1380                                 body->oa.o_flags = 0;
1381                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1382                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1383                 }
1384                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1385                 /* 1 RC for the whole I/O */
1386         }
1387         ptlrpc_request_set_replen(req);
1388
1389         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1390         aa = ptlrpc_req_async_args(req);
1391         aa->aa_oa = oa;
1392         aa->aa_requested_nob = requested_nob;
1393         aa->aa_nio_count = niocount;
1394         aa->aa_page_count = page_count;
1395         aa->aa_resends = 0;
1396         aa->aa_ppga = pga;
1397         aa->aa_cli = cli;
1398         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1399         if (ocapa && reserve)
1400                 aa->aa_ocapa = capa_get(ocapa);
1401
1402         *reqp = req;
1403         RETURN(0);
1404
1405  out:
1406         ptlrpc_req_finished(req);
1407         RETURN(rc);
1408 }
1409
1410 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1411                                 __u32 client_cksum, __u32 server_cksum, int nob,
1412                                 obd_count page_count, struct brw_page **pga,
1413                                 cksum_type_t client_cksum_type)
1414 {
1415         __u32 new_cksum;
1416         char *msg;
1417         cksum_type_t cksum_type;
1418
1419         if (server_cksum == client_cksum) {
1420                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1421                 return 0;
1422         }
1423
1424         if (oa->o_valid & OBD_MD_FLFLAGS)
1425                 cksum_type = cksum_type_unpack(oa->o_flags);
1426         else
1427                 cksum_type = OBD_CKSUM_CRC32;
1428
1429         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1430                                       cksum_type);
1431
1432         if (cksum_type != client_cksum_type)
1433                 msg = "the server did not use the checksum type specified in "
1434                       "the original request - likely a protocol problem";
1435         else if (new_cksum == server_cksum)
1436                 msg = "changed on the client after we checksummed it - "
1437                       "likely false positive due to mmap IO (bug 11742)";
1438         else if (new_cksum == client_cksum)
1439                 msg = "changed in transit before arrival at OST";
1440         else
1441                 msg = "changed in transit AND doesn't match the original - "
1442                       "likely false positive due to mmap IO (bug 11742)";
1443
1444         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1445                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1446                            "["LPU64"-"LPU64"]\n",
1447                            msg, libcfs_nid2str(peer->nid),
1448                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1449                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1450                                                         (__u64)0,
1451                            oa->o_id,
1452                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1453                            pga[0]->off,
1454                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1455         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1456                "client csum now %x\n", client_cksum, client_cksum_type,
1457                server_cksum, cksum_type, new_cksum);
1458         return 1;
1459 }
1460
1461 /* Note rc enters this function as number of bytes transferred */
1462 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1463 {
1464         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1465         const lnet_process_id_t *peer =
1466                         &req->rq_import->imp_connection->c_peer;
1467         struct client_obd *cli = aa->aa_cli;
1468         struct ost_body *body;
1469         __u32 client_cksum = 0;
1470         ENTRY;
1471
1472         if (rc < 0 && rc != -EDQUOT)
1473                 RETURN(rc);
1474
1475         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1476         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1477                                   lustre_swab_ost_body);
1478         if (body == NULL) {
1479                 CDEBUG(D_INFO, "Can't unpack body\n");
1480                 RETURN(-EPROTO);
1481         }
1482
1483         /* set/clear over quota flag for a uid/gid */
1484         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1485             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1486                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1487
1488                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1489                              body->oa.o_flags);
1490         }
1491
1492         if (rc < 0)
1493                 RETURN(rc);
1494
1495         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1496                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1497
1498         osc_update_grant(cli, body);
1499
1500         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1501                 if (rc > 0) {
1502                         CERROR("Unexpected +ve rc %d\n", rc);
1503                         RETURN(-EPROTO);
1504                 }
1505                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1506
1507                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1508                         RETURN(-EAGAIN);
1509
1510                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1511                     check_write_checksum(&body->oa, peer, client_cksum,
1512                                          body->oa.o_cksum, aa->aa_requested_nob,
1513                                          aa->aa_page_count, aa->aa_ppga,
1514                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1515                         RETURN(-EAGAIN);
1516
1517                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1518                                      aa->aa_page_count, aa->aa_ppga);
1519                 GOTO(out, rc);
1520         }
1521
1522         /* The rest of this function executes only for OST_READs */
1523
1524         /* if unwrap_bulk failed, return -EAGAIN to retry */
1525         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1526         if (rc < 0)
1527                 GOTO(out, rc = -EAGAIN);
1528
1529         if (rc > aa->aa_requested_nob) {
1530                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1531                        aa->aa_requested_nob);
1532                 RETURN(-EPROTO);
1533         }
1534
1535         if (rc != req->rq_bulk->bd_nob_transferred) {
1536                 CERROR ("Unexpected rc %d (%d transferred)\n",
1537                         rc, req->rq_bulk->bd_nob_transferred);
1538                 return (-EPROTO);
1539         }
1540
1541         if (rc < aa->aa_requested_nob)
1542                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1543
1544         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1545                 static int cksum_counter;
1546                 __u32      server_cksum = body->oa.o_cksum;
1547                 char      *via;
1548                 char      *router;
1549                 cksum_type_t cksum_type;
1550
1551                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1552                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1553                 else
1554                         cksum_type = OBD_CKSUM_CRC32;
1555                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1556                                                  aa->aa_ppga, OST_READ,
1557                                                  cksum_type);
1558
1559                 if (peer->nid == req->rq_bulk->bd_sender) {
1560                         via = router = "";
1561                 } else {
1562                         via = " via ";
1563                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1564                 }
1565
1566                 if (server_cksum == ~0 && rc > 0) {
1567                         CERROR("Protocol error: server %s set the 'checksum' "
1568                                "bit, but didn't send a checksum.  Not fatal, "
1569                                "but please notify on http://bugzilla.lustre.org/\n",
1570                                libcfs_nid2str(peer->nid));
1571                 } else if (server_cksum != client_cksum) {
1572                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1573                                            "%s%s%s inum "LPU64"/"LPU64" object "
1574                                            LPU64"/"LPU64" extent "
1575                                            "["LPU64"-"LPU64"]\n",
1576                                            req->rq_import->imp_obd->obd_name,
1577                                            libcfs_nid2str(peer->nid),
1578                                            via, router,
1579                                            body->oa.o_valid & OBD_MD_FLFID ?
1580                                                 body->oa.o_fid : (__u64)0,
1581                                            body->oa.o_valid & OBD_MD_FLFID ?
1582                                                 body->oa.o_generation :(__u64)0,
1583                                            body->oa.o_id,
1584                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1585                                                 body->oa.o_gr : (__u64)0,
1586                                            aa->aa_ppga[0]->off,
1587                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1588                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1589                                                                         1);
1590                         CERROR("client %x, server %x, cksum_type %x\n",
1591                                client_cksum, server_cksum, cksum_type);
1592                         cksum_counter = 0;
1593                         aa->aa_oa->o_cksum = client_cksum;
1594                         rc = -EAGAIN;
1595                 } else {
1596                         cksum_counter++;
1597                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1598                         rc = 0;
1599                 }
1600         } else if (unlikely(client_cksum)) {
1601                 static int cksum_missed;
1602
1603                 cksum_missed++;
1604                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1605                         CERROR("Checksum %u requested from %s but not sent\n",
1606                                cksum_missed, libcfs_nid2str(peer->nid));
1607         } else {
1608                 rc = 0;
1609         }
1610 out:
1611         if (rc >= 0)
1612                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1613
1614         RETURN(rc);
1615 }
1616
1617 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1618                             struct lov_stripe_md *lsm,
1619                             obd_count page_count, struct brw_page **pga,
1620                             struct obd_capa *ocapa)
1621 {
1622         struct ptlrpc_request *req;
1623         int                    rc;
1624         cfs_waitq_t            waitq;
1625         int                    resends = 0;
1626         struct l_wait_info     lwi;
1627
1628         ENTRY;
1629
1630         cfs_waitq_init(&waitq);
1631
1632 restart_bulk:
1633         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1634                                   page_count, pga, &req, ocapa, 0);
1635         if (rc != 0)
1636                 return (rc);
1637
1638         rc = ptlrpc_queue_wait(req);
1639
1640         if (rc == -ETIMEDOUT && req->rq_resend) {
1641                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1642                 ptlrpc_req_finished(req);
1643                 goto restart_bulk;
1644         }
1645
1646         rc = osc_brw_fini_request(req, rc);
1647
1648         ptlrpc_req_finished(req);
1649         if (osc_recoverable_error(rc)) {
1650                 resends++;
1651                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1652                         CERROR("too many resend retries, returning error\n");
1653                         RETURN(-EIO);
1654                 }
1655
1656                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1657                 l_wait_event(waitq, 0, &lwi);
1658
1659                 goto restart_bulk;
1660         }
1661
1662         RETURN (rc);
1663 }
1664
1665 int osc_brw_redo_request(struct ptlrpc_request *request,
1666                          struct osc_brw_async_args *aa)
1667 {
1668         struct ptlrpc_request *new_req;
1669         struct ptlrpc_request_set *set = request->rq_set;
1670         struct osc_brw_async_args *new_aa;
1671         struct osc_async_page *oap;
1672         int rc = 0;
1673         ENTRY;
1674
1675         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1676                 CERROR("too many resend retries, returning error\n");
1677                 RETURN(-EIO);
1678         }
1679
1680         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1681
1682         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1683                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1684                                   aa->aa_cli, aa->aa_oa,
1685                                   NULL /* lsm unused by osc currently */,
1686                                   aa->aa_page_count, aa->aa_ppga,
1687                                   &new_req, aa->aa_ocapa, 0);
1688         if (rc)
1689                 RETURN(rc);
1690
1691         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1692
1693         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1694                 if (oap->oap_request != NULL) {
1695                         LASSERTF(request == oap->oap_request,
1696                                  "request %p != oap_request %p\n",
1697                                  request, oap->oap_request);
1698                         if (oap->oap_interrupted) {
1699                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1700                                 ptlrpc_req_finished(new_req);
1701                                 RETURN(-EINTR);
1702                         }
1703                 }
1704         }
1705         /* New request takes over pga and oaps from old request.
1706          * Note that copying a list_head doesn't work, need to move it... */
1707         aa->aa_resends++;
1708         new_req->rq_interpret_reply = request->rq_interpret_reply;
1709         new_req->rq_async_args = request->rq_async_args;
1710         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1711
1712         new_aa = ptlrpc_req_async_args(new_req);
1713
1714         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1715         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1716         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1717
1718         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1719                 if (oap->oap_request) {
1720                         ptlrpc_req_finished(oap->oap_request);
1721                         oap->oap_request = ptlrpc_request_addref(new_req);
1722                 }
1723         }
1724
1725         new_aa->aa_ocapa = aa->aa_ocapa;
1726         aa->aa_ocapa = NULL;
1727
1728         /* use ptlrpc_set_add_req is safe because interpret functions work
1729          * in check_set context. only one way exist with access to request
1730          * from different thread got -EINTR - this way protected with
1731          * cl_loi_list_lock */
1732         ptlrpc_set_add_req(set, new_req);
1733
1734         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1735
1736         DEBUG_REQ(D_INFO, new_req, "new request");
1737         RETURN(0);
1738 }
1739
1740 /*
1741  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1742  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1743  * fine for our small page arrays and doesn't require allocation.  its an
1744  * insertion sort that swaps elements that are strides apart, shrinking the
1745  * stride down until its '1' and the array is sorted.
1746  */
1747 static void sort_brw_pages(struct brw_page **array, int num)
1748 {
1749         int stride, i, j;
1750         struct brw_page *tmp;
1751
1752         if (num == 1)
1753                 return;
1754         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1755                 ;
1756
1757         do {
1758                 stride /= 3;
1759                 for (i = stride ; i < num ; i++) {
1760                         tmp = array[i];
1761                         j = i;
1762                         while (j >= stride && array[j - stride]->off > tmp->off) {
1763                                 array[j] = array[j - stride];
1764                                 j -= stride;
1765                         }
1766                         array[j] = tmp;
1767                 }
1768         } while (stride > 1);
1769 }
1770
1771 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1772 {
1773         int count = 1;
1774         int offset;
1775         int i = 0;
1776
1777         LASSERT (pages > 0);
1778         offset = pg[i]->off & ~CFS_PAGE_MASK;
1779
1780         for (;;) {
1781                 pages--;
1782                 if (pages == 0)         /* that's all */
1783                         return count;
1784
1785                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1786                         return count;   /* doesn't end on page boundary */
1787
1788                 i++;
1789                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1790                 if (offset != 0)        /* doesn't start on page boundary */
1791                         return count;
1792
1793                 count++;
1794         }
1795 }
1796
1797 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1798 {
1799         struct brw_page **ppga;
1800         int i;
1801
1802         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1803         if (ppga == NULL)
1804                 return NULL;
1805
1806         for (i = 0; i < count; i++)
1807                 ppga[i] = pga + i;
1808         return ppga;
1809 }
1810
1811 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1812 {
1813         LASSERT(ppga != NULL);
1814         OBD_FREE(ppga, sizeof(*ppga) * count);
1815 }
1816
1817 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1818                    obd_count page_count, struct brw_page *pga,
1819                    struct obd_trans_info *oti)
1820 {
1821         struct obdo *saved_oa = NULL;
1822         struct brw_page **ppga, **orig;
1823         struct obd_import *imp = class_exp2cliimp(exp);
1824         struct client_obd *cli;
1825         int rc, page_count_orig;
1826         ENTRY;
1827
1828         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1829         cli = &imp->imp_obd->u.cli;
1830
1831         if (cmd & OBD_BRW_CHECK) {
1832                 /* The caller just wants to know if there's a chance that this
1833                  * I/O can succeed */
1834
1835                 if (imp->imp_invalid)
1836                         RETURN(-EIO);
1837                 RETURN(0);
1838         }
1839
1840         /* test_brw with a failed create can trip this, maybe others. */
1841         LASSERT(cli->cl_max_pages_per_rpc);
1842
1843         rc = 0;
1844
1845         orig = ppga = osc_build_ppga(pga, page_count);
1846         if (ppga == NULL)
1847                 RETURN(-ENOMEM);
1848         page_count_orig = page_count;
1849
1850         sort_brw_pages(ppga, page_count);
1851         while (page_count) {
1852                 obd_count pages_per_brw;
1853
1854                 if (page_count > cli->cl_max_pages_per_rpc)
1855                         pages_per_brw = cli->cl_max_pages_per_rpc;
1856                 else
1857                         pages_per_brw = page_count;
1858
1859                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1860
1861                 if (saved_oa != NULL) {
1862                         /* restore previously saved oa */
1863                         *oinfo->oi_oa = *saved_oa;
1864                 } else if (page_count > pages_per_brw) {
1865                         /* save a copy of oa (brw will clobber it) */
1866                         OBDO_ALLOC(saved_oa);
1867                         if (saved_oa == NULL)
1868                                 GOTO(out, rc = -ENOMEM);
1869                         *saved_oa = *oinfo->oi_oa;
1870                 }
1871
1872                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1873                                       pages_per_brw, ppga, oinfo->oi_capa);
1874
1875                 if (rc != 0)
1876                         break;
1877
1878                 page_count -= pages_per_brw;
1879                 ppga += pages_per_brw;
1880         }
1881
1882 out:
1883         osc_release_ppga(orig, page_count_orig);
1884
1885         if (saved_oa != NULL)
1886                 OBDO_FREE(saved_oa);
1887
1888         RETURN(rc);
1889 }
1890
1891 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1892  * the dirty accounting.  Writeback completes or truncate happens before
1893  * writing starts.  Must be called with the loi lock held. */
1894 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1895                            int sent)
1896 {
1897         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1898 }
1899
1900
1901 /* This maintains the lists of pending pages to read/write for a given object
1902  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1903  * to quickly find objects that are ready to send an RPC. */
1904 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1905                          int cmd)
1906 {
1907         int optimal;
1908         ENTRY;
1909
1910         if (lop->lop_num_pending == 0)
1911                 RETURN(0);
1912
1913         /* if we have an invalid import we want to drain the queued pages
1914          * by forcing them through rpcs that immediately fail and complete
1915          * the pages.  recovery relies on this to empty the queued pages
1916          * before canceling the locks and evicting down the llite pages */
1917         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1918                 RETURN(1);
1919
1920         /* stream rpcs in queue order as long as as there is an urgent page
1921          * queued.  this is our cheap solution for good batching in the case
1922          * where writepage marks some random page in the middle of the file
1923          * as urgent because of, say, memory pressure */
1924         if (!list_empty(&lop->lop_urgent)) {
1925                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1926                 RETURN(1);
1927         }
1928         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1929         optimal = cli->cl_max_pages_per_rpc;
1930         if (cmd & OBD_BRW_WRITE) {
1931                 /* trigger a write rpc stream as long as there are dirtiers
1932                  * waiting for space.  as they're waiting, they're not going to
1933                  * create more pages to coallesce with what's waiting.. */
1934                 if (!list_empty(&cli->cl_cache_waiters)) {
1935                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1936                         RETURN(1);
1937                 }
1938                 /* +16 to avoid triggering rpcs that would want to include pages
1939                  * that are being queued but which can't be made ready until
1940                  * the queuer finishes with the page. this is a wart for
1941                  * llite::commit_write() */
1942                 optimal += 16;
1943         }
1944         if (lop->lop_num_pending >= optimal)
1945                 RETURN(1);
1946
1947         RETURN(0);
1948 }
1949
1950 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1951 {
1952         struct osc_async_page *oap;
1953         ENTRY;
1954
1955         if (list_empty(&lop->lop_urgent))
1956                 RETURN(0);
1957
1958         oap = list_entry(lop->lop_urgent.next,
1959                          struct osc_async_page, oap_urgent_item);
1960
1961         if (oap->oap_async_flags & ASYNC_HP) {
1962                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1963                 RETURN(1);
1964         }
1965
1966         RETURN(0);
1967 }
1968
1969 static void on_list(struct list_head *item, struct list_head *list,
1970                     int should_be_on)
1971 {
1972         if (list_empty(item) && should_be_on)
1973                 list_add_tail(item, list);
1974         else if (!list_empty(item) && !should_be_on)
1975                 list_del_init(item);
1976 }
1977
1978 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1979  * can find pages to build into rpcs quickly */
1980 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1981 {
1982         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1983             lop_makes_hprpc(&loi->loi_read_lop)) {
1984                 /* HP rpc */
1985                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1986                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1987         } else {
1988                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1989                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1990                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1991                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1992         }
1993
1994         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1995                 loi->loi_write_lop.lop_num_pending);
1996
1997         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1998                 loi->loi_read_lop.lop_num_pending);
1999 }
2000
2001 static void lop_update_pending(struct client_obd *cli,
2002                                struct loi_oap_pages *lop, int cmd, int delta)
2003 {
2004         lop->lop_num_pending += delta;
2005         if (cmd & OBD_BRW_WRITE)
2006                 cli->cl_pending_w_pages += delta;
2007         else
2008                 cli->cl_pending_r_pages += delta;
2009 }
2010
2011 /**
2012  * this is called when a sync waiter receives an interruption.  Its job is to
2013  * get the caller woken as soon as possible.  If its page hasn't been put in an
2014  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2015  * desiring interruption which will forcefully complete the rpc once the rpc
2016  * has timed out.
2017  */
2018 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2019 {
2020         struct loi_oap_pages *lop;
2021         struct lov_oinfo *loi;
2022         int rc = -EBUSY;
2023         ENTRY;
2024
2025         LASSERT(!oap->oap_interrupted);
2026         oap->oap_interrupted = 1;
2027
2028         /* ok, it's been put in an rpc. only one oap gets a request reference */
2029         if (oap->oap_request != NULL) {
2030                 ptlrpc_mark_interrupted(oap->oap_request);
2031                 ptlrpcd_wake(oap->oap_request);
2032                 ptlrpc_req_finished(oap->oap_request);
2033                 oap->oap_request = NULL;
2034         }
2035
2036         /*
2037          * page completion may be called only if ->cpo_prep() method was
2038          * executed by osc_io_submit(), that also adds page the to pending list
2039          */
2040         if (!list_empty(&oap->oap_pending_item)) {
2041                 list_del_init(&oap->oap_pending_item);
2042                 list_del_init(&oap->oap_urgent_item);
2043
2044                 loi = oap->oap_loi;
2045                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2046                         &loi->loi_write_lop : &loi->loi_read_lop;
2047                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2048                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2049                 rc = oap->oap_caller_ops->ap_completion(env,
2050                                           oap->oap_caller_data,
2051                                           oap->oap_cmd, NULL, -EINTR);
2052         }
2053
2054         RETURN(rc);
2055 }
2056
2057 /* this is trying to propogate async writeback errors back up to the
2058  * application.  As an async write fails we record the error code for later if
2059  * the app does an fsync.  As long as errors persist we force future rpcs to be
2060  * sync so that the app can get a sync error and break the cycle of queueing
2061  * pages for which writeback will fail. */
2062 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2063                            int rc)
2064 {
2065         if (rc) {
2066                 if (!ar->ar_rc)
2067                         ar->ar_rc = rc;
2068
2069                 ar->ar_force_sync = 1;
2070                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2071                 return;
2072
2073         }
2074
2075         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2076                 ar->ar_force_sync = 0;
2077 }
2078
2079 void osc_oap_to_pending(struct osc_async_page *oap)
2080 {
2081         struct loi_oap_pages *lop;
2082
2083         if (oap->oap_cmd & OBD_BRW_WRITE)
2084                 lop = &oap->oap_loi->loi_write_lop;
2085         else
2086                 lop = &oap->oap_loi->loi_read_lop;
2087
2088         if (oap->oap_async_flags & ASYNC_HP)
2089                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2090         else if (oap->oap_async_flags & ASYNC_URGENT)
2091                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2092         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2093         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2094 }
2095
2096 /* this must be called holding the loi list lock to give coverage to exit_cache,
2097  * async_flag maintenance, and oap_request */
2098 static void osc_ap_completion(const struct lu_env *env,
2099                               struct client_obd *cli, struct obdo *oa,
2100                               struct osc_async_page *oap, int sent, int rc)
2101 {
2102         __u64 xid = 0;
2103
2104         ENTRY;
2105         if (oap->oap_request != NULL) {
2106                 xid = ptlrpc_req_xid(oap->oap_request);
2107                 ptlrpc_req_finished(oap->oap_request);
2108                 oap->oap_request = NULL;
2109         }
2110
2111         spin_lock(&oap->oap_lock);
2112         oap->oap_async_flags = 0;
2113         spin_unlock(&oap->oap_lock);
2114         oap->oap_interrupted = 0;
2115
2116         if (oap->oap_cmd & OBD_BRW_WRITE) {
2117                 osc_process_ar(&cli->cl_ar, xid, rc);
2118                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2119         }
2120
2121         if (rc == 0 && oa != NULL) {
2122                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2123                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2124                 if (oa->o_valid & OBD_MD_FLMTIME)
2125                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2126                 if (oa->o_valid & OBD_MD_FLATIME)
2127                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2128                 if (oa->o_valid & OBD_MD_FLCTIME)
2129                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2130         }
2131
2132         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2133                                                 oap->oap_cmd, oa, rc);
2134
2135         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2136          * I/O on the page could start, but OSC calls it under lock
2137          * and thus we can add oap back to pending safely */
2138         if (rc)
2139                 /* upper layer wants to leave the page on pending queue */
2140                 osc_oap_to_pending(oap);
2141         else
2142                 osc_exit_cache(cli, oap, sent);
2143         EXIT;
2144 }
2145
2146 static int brw_interpret(const struct lu_env *env,
2147                          struct ptlrpc_request *req, void *data, int rc)
2148 {
2149         struct osc_brw_async_args *aa = data;
2150         struct client_obd *cli;
2151         int async;
2152         ENTRY;
2153
2154         rc = osc_brw_fini_request(req, rc);
2155         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2156         if (osc_recoverable_error(rc)) {
2157                 rc = osc_brw_redo_request(req, aa);
2158                 if (rc == 0)
2159                         RETURN(0);
2160         }
2161
2162         if (aa->aa_ocapa) {
2163                 capa_put(aa->aa_ocapa);
2164                 aa->aa_ocapa = NULL;
2165         }
2166
2167         cli = aa->aa_cli;
2168
2169         client_obd_list_lock(&cli->cl_loi_list_lock);
2170
2171         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2172          * is called so we know whether to go to sync BRWs or wait for more
2173          * RPCs to complete */
2174         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2175                 cli->cl_w_in_flight--;
2176         else
2177                 cli->cl_r_in_flight--;
2178
2179         async = list_empty(&aa->aa_oaps);
2180         if (!async) { /* from osc_send_oap_rpc() */
2181                 struct osc_async_page *oap, *tmp;
2182                 /* the caller may re-use the oap after the completion call so
2183                  * we need to clean it up a little */
2184                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2185                         list_del_init(&oap->oap_rpc_item);
2186                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2187                 }
2188                 OBDO_FREE(aa->aa_oa);
2189         } else { /* from async_internal() */
2190                 int i;
2191                 for (i = 0; i < aa->aa_page_count; i++)
2192                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2193
2194                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2195                         OBDO_FREE(aa->aa_oa);
2196         }
2197         osc_wake_cache_waiters(cli);
2198         osc_check_rpcs(env, cli);
2199         client_obd_list_unlock(&cli->cl_loi_list_lock);
2200         if (!async)
2201                 cl_req_completion(env, aa->aa_clerq, rc);
2202         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2203         RETURN(rc);
2204 }
2205
2206 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2207                                             struct client_obd *cli,
2208                                             struct list_head *rpc_list,
2209                                             int page_count, int cmd)
2210 {
2211         struct ptlrpc_request *req;
2212         struct brw_page **pga = NULL;
2213         struct osc_brw_async_args *aa;
2214         struct obdo *oa = NULL;
2215         const struct obd_async_page_ops *ops = NULL;
2216         void *caller_data = NULL;
2217         struct osc_async_page *oap;
2218         struct osc_async_page *tmp;
2219         struct ost_body *body;
2220         struct cl_req *clerq = NULL;
2221         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2222         struct ldlm_lock *lock = NULL;
2223         struct cl_req_attr crattr;
2224         int i, rc;
2225
2226         ENTRY;
2227         LASSERT(!list_empty(rpc_list));
2228
2229         memset(&crattr, 0, sizeof crattr);
2230         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2231         if (pga == NULL)
2232                 GOTO(out, req = ERR_PTR(-ENOMEM));
2233
2234         OBDO_ALLOC(oa);
2235         if (oa == NULL)
2236                 GOTO(out, req = ERR_PTR(-ENOMEM));
2237
2238         i = 0;
2239         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2240                 struct cl_page *page = osc_oap2cl_page(oap);
2241                 if (ops == NULL) {
2242                         ops = oap->oap_caller_ops;
2243                         caller_data = oap->oap_caller_data;
2244
2245                         clerq = cl_req_alloc(env, page, crt,
2246                                              1 /* only 1-object rpcs for
2247                                                 * now */);
2248                         if (IS_ERR(clerq))
2249                                 GOTO(out, req = (void *)clerq);
2250                         lock = oap->oap_ldlm_lock;
2251                 }
2252                 pga[i] = &oap->oap_brw_page;
2253                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2254                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2255                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2256                 i++;
2257                 cl_req_page_add(env, clerq, page);
2258         }
2259
2260         /* always get the data for the obdo for the rpc */
2261         LASSERT(ops != NULL);
2262         crattr.cra_oa = oa;
2263         crattr.cra_capa = NULL;
2264         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2265         if (lock) {
2266                 oa->o_handle = lock->l_remote_handle;
2267                 oa->o_valid |= OBD_MD_FLHANDLE;
2268         }
2269
2270         rc = cl_req_prep(env, clerq);
2271         if (rc != 0) {
2272                 CERROR("cl_req_prep failed: %d\n", rc);
2273                 GOTO(out, req = ERR_PTR(rc));
2274         }
2275
2276         sort_brw_pages(pga, page_count);
2277         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2278                                   pga, &req, crattr.cra_capa, 1);
2279         if (rc != 0) {
2280                 CERROR("prep_req failed: %d\n", rc);
2281                 GOTO(out, req = ERR_PTR(rc));
2282         }
2283
2284         /* Need to update the timestamps after the request is built in case
2285          * we race with setattr (locally or in queue at OST).  If OST gets
2286          * later setattr before earlier BRW (as determined by the request xid),
2287          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2288          * way to do this in a single call.  bug 10150 */
2289         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2290         cl_req_attr_set(env, clerq, &crattr,
2291                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2292
2293         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2294         aa = ptlrpc_req_async_args(req);
2295         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2296         list_splice(rpc_list, &aa->aa_oaps);
2297         CFS_INIT_LIST_HEAD(rpc_list);
2298         aa->aa_clerq = clerq;
2299 out:
2300         capa_put(crattr.cra_capa);
2301         if (IS_ERR(req)) {
2302                 if (oa)
2303                         OBDO_FREE(oa);
2304                 if (pga)
2305                         OBD_FREE(pga, sizeof(*pga) * page_count);
2306                 /* this should happen rarely and is pretty bad, it makes the
2307                  * pending list not follow the dirty order */
2308                 client_obd_list_lock(&cli->cl_loi_list_lock);
2309                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2310                         list_del_init(&oap->oap_rpc_item);
2311
2312                         /* queued sync pages can be torn down while the pages
2313                          * were between the pending list and the rpc */
2314                         if (oap->oap_interrupted) {
2315                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2316                                 osc_ap_completion(env, cli, NULL, oap, 0,
2317                                                   oap->oap_count);
2318                                 continue;
2319                         }
2320                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2321                 }
2322                 if (clerq && !IS_ERR(clerq))
2323                         cl_req_completion(env, clerq, PTR_ERR(req));
2324         }
2325         RETURN(req);
2326 }
2327
2328 /**
2329  * prepare pages for ASYNC io and put pages in send queue.
2330  *
2331  * \param cli -
2332  * \param loi -
2333  * \param cmd - OBD_BRW_* macroses
2334  * \param lop - pending pages
2335  *
2336  * \return zero if pages successfully add to send queue.
2337  * \return not zere if error occurring.
2338  */
2339 static int
2340 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2341                  struct lov_oinfo *loi,
2342                  int cmd, struct loi_oap_pages *lop)
2343 {
2344         struct ptlrpc_request *req;
2345         obd_count page_count = 0;
2346         struct osc_async_page *oap = NULL, *tmp;
2347         struct osc_brw_async_args *aa;
2348         const struct obd_async_page_ops *ops;
2349         CFS_LIST_HEAD(rpc_list);
2350         CFS_LIST_HEAD(tmp_list);
2351         unsigned int ending_offset;
2352         unsigned  starting_offset = 0;
2353         int srvlock = 0;
2354         struct cl_object *clob = NULL;
2355         ENTRY;
2356
2357         /* ASYNC_HP pages first. At present, when the lock the pages is
2358          * to be canceled, the pages covered by the lock will be sent out
2359          * with ASYNC_HP. We have to send out them as soon as possible. */
2360         list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2361                 if (oap->oap_async_flags & ASYNC_HP) 
2362                         list_move(&oap->oap_pending_item, &tmp_list);
2363                 else
2364                         list_move_tail(&oap->oap_pending_item, &tmp_list);
2365                 if (++page_count >= cli->cl_max_pages_per_rpc)
2366                         break;
2367         }
2368
2369         list_splice(&tmp_list, &lop->lop_pending);
2370         page_count = 0;
2371
2372         /* first we find the pages we're allowed to work with */
2373         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2374                                  oap_pending_item) {
2375                 ops = oap->oap_caller_ops;
2376
2377                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2378                          "magic 0x%x\n", oap, oap->oap_magic);
2379
2380                 if (clob == NULL) {
2381                         /* pin object in memory, so that completion call-backs
2382                          * can be safely called under client_obd_list lock. */
2383                         clob = osc_oap2cl_page(oap)->cp_obj;
2384                         cl_object_get(clob);
2385                 }
2386
2387                 if (page_count != 0 &&
2388                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2389                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2390                                " oap %p, page %p, srvlock %u\n",
2391                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2392                         break;
2393                 }
2394
2395                 /* If there is a gap at the start of this page, it can't merge
2396                  * with any previous page, so we'll hand the network a
2397                  * "fragmented" page array that it can't transfer in 1 RDMA */
2398                 if (page_count != 0 && oap->oap_page_off != 0)
2399                         break;
2400
2401                 /* in llite being 'ready' equates to the page being locked
2402                  * until completion unlocks it.  commit_write submits a page
2403                  * as not ready because its unlock will happen unconditionally
2404                  * as the call returns.  if we race with commit_write giving
2405                  * us that page we dont' want to create a hole in the page
2406                  * stream, so we stop and leave the rpc to be fired by
2407                  * another dirtier or kupdated interval (the not ready page
2408                  * will still be on the dirty list).  we could call in
2409                  * at the end of ll_file_write to process the queue again. */
2410                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2411                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2412                                                     cmd);
2413                         if (rc < 0)
2414                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2415                                                 "instead of ready\n", oap,
2416                                                 oap->oap_page, rc);
2417                         switch (rc) {
2418                         case -EAGAIN:
2419                                 /* llite is telling us that the page is still
2420                                  * in commit_write and that we should try
2421                                  * and put it in an rpc again later.  we
2422                                  * break out of the loop so we don't create
2423                                  * a hole in the sequence of pages in the rpc
2424                                  * stream.*/
2425                                 oap = NULL;
2426                                 break;
2427                         case -EINTR:
2428                                 /* the io isn't needed.. tell the checks
2429                                  * below to complete the rpc with EINTR */
2430                                 spin_lock(&oap->oap_lock);
2431                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2432                                 spin_unlock(&oap->oap_lock);
2433                                 oap->oap_count = -EINTR;
2434                                 break;
2435                         case 0:
2436                                 spin_lock(&oap->oap_lock);
2437                                 oap->oap_async_flags |= ASYNC_READY;
2438                                 spin_unlock(&oap->oap_lock);
2439                                 break;
2440                         default:
2441                                 LASSERTF(0, "oap %p page %p returned %d "
2442                                             "from make_ready\n", oap,
2443                                             oap->oap_page, rc);
2444                                 break;
2445                         }
2446                 }
2447                 if (oap == NULL)
2448                         break;
2449                 /*
2450                  * Page submitted for IO has to be locked. Either by
2451                  * ->ap_make_ready() or by higher layers.
2452                  */
2453 #if defined(__KERNEL__) && defined(__linux__)
2454                 {
2455                         struct cl_page *page;
2456
2457                         page = osc_oap2cl_page(oap);
2458
2459                         if (page->cp_type == CPT_CACHEABLE &&
2460                             !(PageLocked(oap->oap_page) &&
2461                               (CheckWriteback(oap->oap_page, cmd)))) {
2462                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2463                                        oap->oap_page,
2464                                        (long)oap->oap_page->flags,
2465                                        oap->oap_async_flags);
2466                                 LBUG();
2467                         }
2468                 }
2469 #endif
2470
2471                 /* take the page out of our book-keeping */
2472                 list_del_init(&oap->oap_pending_item);
2473                 lop_update_pending(cli, lop, cmd, -1);
2474                 list_del_init(&oap->oap_urgent_item);
2475
2476                 if (page_count == 0)
2477                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2478                                           (PTLRPC_MAX_BRW_SIZE - 1);
2479
2480                 /* ask the caller for the size of the io as the rpc leaves. */
2481                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2482                         oap->oap_count =
2483                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2484                                                       cmd);
2485                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2486                 }
2487                 if (oap->oap_count <= 0) {
2488                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2489                                oap->oap_count);
2490                         osc_ap_completion(env, cli, NULL,
2491                                           oap, 0, oap->oap_count);
2492                         continue;
2493                 }
2494
2495                 /* now put the page back in our accounting */
2496                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2497                 if (page_count == 0)
2498                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2499                 if (++page_count >= cli->cl_max_pages_per_rpc)
2500                         break;
2501
2502                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2503                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2504                  * have the same alignment as the initial writes that allocated
2505                  * extents on the server. */
2506                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2507                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2508                 if (ending_offset == 0)
2509                         break;
2510
2511                 /* If there is a gap at the end of this page, it can't merge
2512                  * with any subsequent pages, so we'll hand the network a
2513                  * "fragmented" page array that it can't transfer in 1 RDMA */
2514                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2515                         break;
2516         }
2517
2518         osc_wake_cache_waiters(cli);
2519
2520         loi_list_maint(cli, loi);
2521
2522         client_obd_list_unlock(&cli->cl_loi_list_lock);
2523
2524         if (clob != NULL)
2525                 cl_object_put(env, clob);
2526
2527         if (page_count == 0) {
2528                 client_obd_list_lock(&cli->cl_loi_list_lock);
2529                 RETURN(0);
2530         }
2531
2532         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2533         if (IS_ERR(req)) {
2534                 LASSERT(list_empty(&rpc_list));
2535                 loi_list_maint(cli, loi);
2536                 RETURN(PTR_ERR(req));
2537         }
2538
2539         aa = ptlrpc_req_async_args(req);
2540
2541         if (cmd == OBD_BRW_READ) {
2542                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2543                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2544                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2545                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2546         } else {
2547                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2548                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2549                                  cli->cl_w_in_flight);
2550                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2551                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2552         }
2553         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2554
2555         client_obd_list_lock(&cli->cl_loi_list_lock);
2556
2557         if (cmd == OBD_BRW_READ)
2558                 cli->cl_r_in_flight++;
2559         else
2560                 cli->cl_w_in_flight++;
2561
2562         /* queued sync pages can be torn down while the pages
2563          * were between the pending list and the rpc */
2564         tmp = NULL;
2565         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2566                 /* only one oap gets a request reference */
2567                 if (tmp == NULL)
2568                         tmp = oap;
2569                 if (oap->oap_interrupted && !req->rq_intr) {
2570                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2571                                oap, req);
2572                         ptlrpc_mark_interrupted(req);
2573                 }
2574         }
2575         if (tmp != NULL)
2576                 tmp->oap_request = ptlrpc_request_addref(req);
2577
2578         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2579                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2580
2581         req->rq_interpret_reply = brw_interpret;
2582         ptlrpcd_add_req(req, PSCOPE_BRW);
2583         RETURN(1);
2584 }
2585
2586 #define LOI_DEBUG(LOI, STR, args...)                                     \
2587         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2588                !list_empty(&(LOI)->loi_ready_item) ||                    \
2589                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2590                (LOI)->loi_write_lop.lop_num_pending,                     \
2591                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2592                (LOI)->loi_read_lop.lop_num_pending,                      \
2593                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2594                args)                                                     \
2595
2596 /* This is called by osc_check_rpcs() to find which objects have pages that
2597  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2598 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2599 {
2600         ENTRY;
2601
2602         /* First return objects that have blocked locks so that they
2603          * will be flushed quickly and other clients can get the lock,
2604          * then objects which have pages ready to be stuffed into RPCs */
2605         if (!list_empty(&cli->cl_loi_hp_ready_list))
2606                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2607                                   struct lov_oinfo, loi_hp_ready_item));
2608         if (!list_empty(&cli->cl_loi_ready_list))
2609                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2610                                   struct lov_oinfo, loi_ready_item));
2611
2612         /* then if we have cache waiters, return all objects with queued
2613          * writes.  This is especially important when many small files
2614          * have filled up the cache and not been fired into rpcs because
2615          * they don't pass the nr_pending/object threshhold */
2616         if (!list_empty(&cli->cl_cache_waiters) &&
2617             !list_empty(&cli->cl_loi_write_list))
2618                 RETURN(list_entry(cli->cl_loi_write_list.next,
2619                                   struct lov_oinfo, loi_write_item));
2620
2621         /* then return all queued objects when we have an invalid import
2622          * so that they get flushed */
2623         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2624                 if (!list_empty(&cli->cl_loi_write_list))
2625                         RETURN(list_entry(cli->cl_loi_write_list.next,
2626                                           struct lov_oinfo, loi_write_item));
2627                 if (!list_empty(&cli->cl_loi_read_list))
2628                         RETURN(list_entry(cli->cl_loi_read_list.next,
2629                                           struct lov_oinfo, loi_read_item));
2630         }
2631         RETURN(NULL);
2632 }
2633
2634 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2635 {
2636         struct osc_async_page *oap;
2637         int hprpc = 0;
2638
2639         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2640                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2641                                  struct osc_async_page, oap_urgent_item);
2642                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2643         }
2644
2645         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2646                 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2647                                  struct osc_async_page, oap_urgent_item);
2648                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2649         }
2650
2651         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2652 }
2653
2654 /* called with the loi list lock held */
2655 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2656 {
2657         struct lov_oinfo *loi;
2658         int rc = 0, race_counter = 0;
2659         ENTRY;
2660
2661         while ((loi = osc_next_loi(cli)) != NULL) {
2662                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2663
2664                 if (osc_max_rpc_in_flight(cli, loi))
2665                         break;
2666
2667                 /* attempt some read/write balancing by alternating between
2668                  * reads and writes in an object.  The makes_rpc checks here
2669                  * would be redundant if we were getting read/write work items
2670                  * instead of objects.  we don't want send_oap_rpc to drain a
2671                  * partial read pending queue when we're given this object to
2672                  * do io on writes while there are cache waiters */
2673                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2674                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2675                                               &loi->loi_write_lop);
2676                         if (rc < 0) {
2677                                 CERROR("Write request failed with %d\n", rc);
2678
2679                                 /* osc_send_oap_rpc failed, mostly because of
2680                                  * memory pressure.
2681                                  *
2682                                  * It can't break here, because if:
2683                                  *  - a page was submitted by osc_io_submit, so
2684                                  *    page locked;
2685                                  *  - no request in flight
2686                                  *  - no subsequent request
2687                                  * The system will be in live-lock state,
2688                                  * because there is no chance to call
2689                                  * osc_io_unplug() and osc_check_rpcs() any
2690                                  * more. pdflush can't help in this case,
2691                                  * because it might be blocked at grabbing
2692                                  * the page lock as we mentioned.
2693                                  *
2694                                  * Anyway, continue to drain pages. */
2695                                 /* break; */
2696                         }
2697
2698                         if (rc > 0)
2699                                 race_counter = 0;
2700                         else
2701                                 race_counter++;
2702                 }
2703                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2704                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2705                                               &loi->loi_read_lop);
2706                         if (rc < 0)
2707                                 CERROR("Read request failed with %d\n", rc);
2708
2709                         if (rc > 0)
2710                                 race_counter = 0;
2711                         else
2712                                 race_counter++;
2713                 }
2714
2715                 /* attempt some inter-object balancing by issueing rpcs
2716                  * for each object in turn */
2717                 if (!list_empty(&loi->loi_hp_ready_item))
2718                         list_del_init(&loi->loi_hp_ready_item);
2719                 if (!list_empty(&loi->loi_ready_item))
2720                         list_del_init(&loi->loi_ready_item);
2721                 if (!list_empty(&loi->loi_write_item))
2722                         list_del_init(&loi->loi_write_item);
2723                 if (!list_empty(&loi->loi_read_item))
2724                         list_del_init(&loi->loi_read_item);
2725
2726                 loi_list_maint(cli, loi);
2727
2728                 /* send_oap_rpc fails with 0 when make_ready tells it to
2729                  * back off.  llite's make_ready does this when it tries
2730                  * to lock a page queued for write that is already locked.
2731                  * we want to try sending rpcs from many objects, but we
2732                  * don't want to spin failing with 0.  */
2733                 if (race_counter == 10)
2734                         break;
2735         }
2736         EXIT;
2737 }
2738
2739 /* we're trying to queue a page in the osc so we're subject to the
2740  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2741  * If the osc's queued pages are already at that limit, then we want to sleep
2742  * until there is space in the osc's queue for us.  We also may be waiting for
2743  * write credits from the OST if there are RPCs in flight that may return some
2744  * before we fall back to sync writes.
2745  *
2746  * We need this know our allocation was granted in the presence of signals */
2747 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2748 {
2749         int rc;
2750         ENTRY;
2751         client_obd_list_lock(&cli->cl_loi_list_lock);
2752         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2753         client_obd_list_unlock(&cli->cl_loi_list_lock);
2754         RETURN(rc);
2755 };
2756
2757 /**
2758  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2759  * is available.
2760  */
2761 int osc_enter_cache_try(const struct lu_env *env,
2762                         struct client_obd *cli, struct lov_oinfo *loi,
2763                         struct osc_async_page *oap, int transient)
2764 {
2765         int has_grant;
2766
2767         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2768         if (has_grant) {
2769                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2770                 if (transient) {
2771                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2772                         atomic_inc(&obd_dirty_transit_pages);
2773                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2774                 }
2775         }
2776         return has_grant;
2777 }
2778
2779 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2780  * grant or cache space. */
2781 static int osc_enter_cache(const struct lu_env *env,
2782                            struct client_obd *cli, struct lov_oinfo *loi,
2783                            struct osc_async_page *oap)
2784 {
2785         struct osc_cache_waiter ocw;
2786         struct l_wait_info lwi = { 0 };
2787
2788         ENTRY;
2789
2790         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2791                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2792                cli->cl_dirty_max, obd_max_dirty_pages,
2793                cli->cl_lost_grant, cli->cl_avail_grant);
2794
2795         /* force the caller to try sync io.  this can jump the list
2796          * of queued writes and create a discontiguous rpc stream */
2797         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2798             loi->loi_ar.ar_force_sync)
2799                 RETURN(-EDQUOT);
2800
2801         /* Hopefully normal case - cache space and write credits available */
2802         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2803             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2804             osc_enter_cache_try(env, cli, loi, oap, 0))
2805                 RETURN(0);
2806
2807         /* Make sure that there are write rpcs in flight to wait for.  This
2808          * is a little silly as this object may not have any pending but
2809          * other objects sure might. */
2810         if (cli->cl_w_in_flight) {
2811                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2812                 cfs_waitq_init(&ocw.ocw_waitq);
2813                 ocw.ocw_oap = oap;
2814                 ocw.ocw_rc = 0;
2815
2816                 loi_list_maint(cli, loi);
2817                 osc_check_rpcs(env, cli);
2818                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2819
2820                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2821                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2822
2823                 client_obd_list_lock(&cli->cl_loi_list_lock);
2824                 if (!list_empty(&ocw.ocw_entry)) {
2825                         list_del(&ocw.ocw_entry);
2826                         RETURN(-EINTR);
2827                 }
2828                 RETURN(ocw.ocw_rc);
2829         }
2830
2831         RETURN(-EDQUOT);
2832 }
2833
2834
2835 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2836                         struct lov_oinfo *loi, cfs_page_t *page,
2837                         obd_off offset, const struct obd_async_page_ops *ops,
2838                         void *data, void **res, int nocache,
2839                         struct lustre_handle *lockh)
2840 {
2841         struct osc_async_page *oap;
2842
2843         ENTRY;
2844
2845         if (!page)
2846                 return size_round(sizeof(*oap));
2847
2848         oap = *res;
2849         oap->oap_magic = OAP_MAGIC;
2850         oap->oap_cli = &exp->exp_obd->u.cli;
2851         oap->oap_loi = loi;
2852
2853         oap->oap_caller_ops = ops;
2854         oap->oap_caller_data = data;
2855
2856         oap->oap_page = page;
2857         oap->oap_obj_off = offset;
2858         if (!client_is_remote(exp) &&
2859             cfs_capable(CFS_CAP_SYS_RESOURCE))
2860                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2861
2862         LASSERT(!(offset & ~CFS_PAGE_MASK));
2863
2864         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2865         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2866         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2867         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2868
2869         spin_lock_init(&oap->oap_lock);
2870         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2871         RETURN(0);
2872 }
2873
2874 struct osc_async_page *oap_from_cookie(void *cookie)
2875 {
2876         struct osc_async_page *oap = cookie;
2877         if (oap->oap_magic != OAP_MAGIC)
2878                 return ERR_PTR(-EINVAL);
2879         return oap;
2880 };
2881
2882 int osc_queue_async_io(const struct lu_env *env,
2883                        struct obd_export *exp, struct lov_stripe_md *lsm,
2884                        struct lov_oinfo *loi, void *cookie,
2885                        int cmd, obd_off off, int count,
2886                        obd_flag brw_flags, enum async_flags async_flags)
2887 {
2888         struct client_obd *cli = &exp->exp_obd->u.cli;
2889         struct osc_async_page *oap;
2890         int rc = 0;
2891         ENTRY;
2892
2893         oap = oap_from_cookie(cookie);
2894         if (IS_ERR(oap))
2895                 RETURN(PTR_ERR(oap));
2896
2897         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2898                 RETURN(-EIO);
2899
2900         if (!list_empty(&oap->oap_pending_item) ||
2901             !list_empty(&oap->oap_urgent_item) ||
2902             !list_empty(&oap->oap_rpc_item))
2903                 RETURN(-EBUSY);
2904
2905         /* check if the file's owner/group is over quota */
2906         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2907                 struct cl_object *obj;
2908                 struct cl_attr    attr; /* XXX put attr into thread info */
2909                 unsigned int qid[MAXQUOTAS];
2910
2911                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2912
2913                 cl_object_attr_lock(obj);
2914                 rc = cl_object_attr_get(env, obj, &attr);
2915                 cl_object_attr_unlock(obj);
2916
2917                 qid[USRQUOTA] = attr.cat_uid;
2918                 qid[GRPQUOTA] = attr.cat_gid;
2919                 if (rc == 0 &&
2920                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2921                         rc = -EDQUOT;
2922                 if (rc)
2923                         RETURN(rc);
2924         }
2925
2926         if (loi == NULL)
2927                 loi = lsm->lsm_oinfo[0];
2928
2929         client_obd_list_lock(&cli->cl_loi_list_lock);
2930
2931         LASSERT(off + count <= CFS_PAGE_SIZE);
2932         oap->oap_cmd = cmd;
2933         oap->oap_page_off = off;
2934         oap->oap_count = count;
2935         oap->oap_brw_flags = brw_flags;
2936         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2937         if (libcfs_memory_pressure_get())
2938                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2939         spin_lock(&oap->oap_lock);
2940         oap->oap_async_flags = async_flags;
2941         spin_unlock(&oap->oap_lock);
2942
2943         if (cmd & OBD_BRW_WRITE) {
2944                 rc = osc_enter_cache(env, cli, loi, oap);
2945                 if (rc) {
2946                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2947                         RETURN(rc);
2948                 }
2949         }
2950
2951         osc_oap_to_pending(oap);
2952         loi_list_maint(cli, loi);
2953
2954         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2955                   cmd);
2956
2957         osc_check_rpcs(env, cli);
2958         client_obd_list_unlock(&cli->cl_loi_list_lock);
2959
2960         RETURN(0);
2961 }
2962
2963 /* aka (~was & now & flag), but this is more clear :) */
2964 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2965
2966 int osc_set_async_flags_base(struct client_obd *cli,
2967                              struct lov_oinfo *loi, struct osc_async_page *oap,
2968                              obd_flag async_flags)
2969 {
2970         struct loi_oap_pages *lop;
2971         int flags = 0;
2972         ENTRY;
2973
2974         LASSERT(!list_empty(&oap->oap_pending_item));
2975
2976         if (oap->oap_cmd & OBD_BRW_WRITE) {
2977                 lop = &loi->loi_write_lop;
2978         } else {
2979                 lop = &loi->loi_read_lop;
2980         }
2981
2982         if ((oap->oap_async_flags & async_flags) == async_flags)
2983                 RETURN(0);
2984
2985         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2986                 flags |= ASYNC_READY;
2987
2988         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2989             list_empty(&oap->oap_rpc_item)) {
2990                 if (oap->oap_async_flags & ASYNC_HP)
2991                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2992                 else
2993                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2994                 flags |= ASYNC_URGENT;
2995                 loi_list_maint(cli, loi);
2996         }
2997         spin_lock(&oap->oap_lock);
2998         oap->oap_async_flags |= flags;
2999         spin_unlock(&oap->oap_lock);
3000
3001         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3002                         oap->oap_async_flags);
3003         RETURN(0);
3004 }
3005
3006 int osc_teardown_async_page(struct obd_export *exp,
3007                             struct lov_stripe_md *lsm,
3008                             struct lov_oinfo *loi, void *cookie)
3009 {
3010         struct client_obd *cli = &exp->exp_obd->u.cli;
3011         struct loi_oap_pages *lop;
3012         struct osc_async_page *oap;
3013         int rc = 0;
3014         ENTRY;
3015
3016         oap = oap_from_cookie(cookie);
3017         if (IS_ERR(oap))
3018                 RETURN(PTR_ERR(oap));
3019
3020         if (loi == NULL)
3021                 loi = lsm->lsm_oinfo[0];
3022
3023         if (oap->oap_cmd & OBD_BRW_WRITE) {
3024                 lop = &loi->loi_write_lop;
3025         } else {
3026                 lop = &loi->loi_read_lop;
3027         }
3028
3029         client_obd_list_lock(&cli->cl_loi_list_lock);
3030
3031         if (!list_empty(&oap->oap_rpc_item))
3032                 GOTO(out, rc = -EBUSY);
3033
3034         osc_exit_cache(cli, oap, 0);
3035         osc_wake_cache_waiters(cli);
3036
3037         if (!list_empty(&oap->oap_urgent_item)) {
3038                 list_del_init(&oap->oap_urgent_item);
3039                 spin_lock(&oap->oap_lock);
3040                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3041                 spin_unlock(&oap->oap_lock);
3042         }
3043         if (!list_empty(&oap->oap_pending_item)) {
3044                 list_del_init(&oap->oap_pending_item);
3045                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3046         }
3047         loi_list_maint(cli, loi);
3048         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3049 out:
3050         client_obd_list_unlock(&cli->cl_loi_list_lock);
3051         RETURN(rc);
3052 }
3053
3054 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3055                                          struct ldlm_enqueue_info *einfo,
3056                                          int flags)
3057 {
3058         void *data = einfo->ei_cbdata;
3059
3060         LASSERT(lock != NULL);
3061         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3062         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3063         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3064         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3065
3066         lock_res_and_lock(lock);
3067         spin_lock(&osc_ast_guard);
3068         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3069         lock->l_ast_data = data;
3070         spin_unlock(&osc_ast_guard);
3071         unlock_res_and_lock(lock);
3072 }
3073
3074 static void osc_set_data_with_check(struct lustre_handle *lockh,
3075                                     struct ldlm_enqueue_info *einfo,
3076                                     int flags)
3077 {
3078         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3079
3080         if (lock != NULL) {
3081                 osc_set_lock_data_with_check(lock, einfo, flags);
3082                 LDLM_LOCK_PUT(lock);
3083         } else
3084                 CERROR("lockh %p, data %p - client evicted?\n",
3085                        lockh, einfo->ei_cbdata);
3086 }
3087
3088 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3089                              ldlm_iterator_t replace, void *data)
3090 {
3091         struct ldlm_res_id res_id;
3092         struct obd_device *obd = class_exp2obd(exp);
3093
3094         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3095         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3096         return 0;
3097 }
3098
3099 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3100                             obd_enqueue_update_f upcall, void *cookie,
3101                             int *flags, int rc)
3102 {
3103         int intent = *flags & LDLM_FL_HAS_INTENT;
3104         ENTRY;
3105
3106         if (intent) {
3107                 /* The request was created before ldlm_cli_enqueue call. */
3108                 if (rc == ELDLM_LOCK_ABORTED) {
3109                         struct ldlm_reply *rep;
3110                         rep = req_capsule_server_get(&req->rq_pill,
3111                                                      &RMF_DLM_REP);
3112
3113                         LASSERT(rep != NULL);
3114                         if (rep->lock_policy_res1)
3115                                 rc = rep->lock_policy_res1;
3116                 }
3117         }
3118
3119         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3120                 *flags |= LDLM_FL_LVB_READY;
3121                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3122                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3123         }
3124
3125         /* Call the update callback. */
3126         rc = (*upcall)(cookie, rc);
3127         RETURN(rc);
3128 }
3129
3130 static int osc_enqueue_interpret(const struct lu_env *env,
3131                                  struct ptlrpc_request *req,
3132                                  struct osc_enqueue_args *aa, int rc)
3133 {
3134         struct ldlm_lock *lock;
3135         struct lustre_handle handle;
3136         __u32 mode;
3137
3138         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3139          * might be freed anytime after lock upcall has been called. */
3140         lustre_handle_copy(&handle, aa->oa_lockh);
3141         mode = aa->oa_ei->ei_mode;
3142
3143         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3144          * be valid. */
3145         lock = ldlm_handle2lock(&handle);
3146
3147         /* Take an additional reference so that a blocking AST that
3148          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3149          * to arrive after an upcall has been executed by
3150          * osc_enqueue_fini(). */
3151         ldlm_lock_addref(&handle, mode);
3152
3153         /* Complete obtaining the lock procedure. */
3154         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3155                                    mode, aa->oa_flags, aa->oa_lvb,
3156                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3157                                    &handle, rc);
3158         /* Complete osc stuff. */
3159         rc = osc_enqueue_fini(req, aa->oa_lvb,
3160                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3161
3162         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3163
3164         /* Release the lock for async request. */
3165         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3166                 /*
3167                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3168                  * not already released by
3169                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3170                  */
3171                 ldlm_lock_decref(&handle, mode);
3172
3173         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3174                  aa->oa_lockh, req, aa);
3175         ldlm_lock_decref(&handle, mode);
3176         LDLM_LOCK_PUT(lock);
3177         return rc;
3178 }
3179
3180 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3181                         struct lov_oinfo *loi, int flags,
3182                         struct ost_lvb *lvb, __u32 mode, int rc)
3183 {
3184         if (rc == ELDLM_OK) {
3185                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3186                 __u64 tmp;
3187
3188                 LASSERT(lock != NULL);
3189                 loi->loi_lvb = *lvb;
3190                 tmp = loi->loi_lvb.lvb_size;
3191                 /* Extend KMS up to the end of this lock and no further
3192                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3193                 if (tmp > lock->l_policy_data.l_extent.end)
3194                         tmp = lock->l_policy_data.l_extent.end + 1;
3195                 if (tmp >= loi->loi_kms) {
3196                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3197                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3198                         loi_kms_set(loi, tmp);
3199                 } else {
3200                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3201                                    LPU64"; leaving kms="LPU64", end="LPU64,
3202                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3203                                    lock->l_policy_data.l_extent.end);
3204                 }
3205                 ldlm_lock_allow_match(lock);
3206                 LDLM_LOCK_PUT(lock);
3207         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3208                 loi->loi_lvb = *lvb;
3209                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3210                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3211                 rc = ELDLM_OK;
3212         }
3213 }
3214 EXPORT_SYMBOL(osc_update_enqueue);
3215
3216 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3217
3218 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3219  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3220  * other synchronous requests, however keeping some locks and trying to obtain
3221  * others may take a considerable amount of time in a case of ost failure; and
3222  * when other sync requests do not get released lock from a client, the client
3223  * is excluded from the cluster -- such scenarious make the life difficult, so
3224  * release locks just after they are obtained. */
3225 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3226                      int *flags, ldlm_policy_data_t *policy,
3227                      struct ost_lvb *lvb, int kms_valid,
3228                      obd_enqueue_update_f upcall, void *cookie,
3229                      struct ldlm_enqueue_info *einfo,
3230                      struct lustre_handle *lockh,
3231                      struct ptlrpc_request_set *rqset, int async)
3232 {
3233         struct obd_device *obd = exp->exp_obd;
3234         struct ptlrpc_request *req = NULL;
3235         int intent = *flags & LDLM_FL_HAS_INTENT;
3236         ldlm_mode_t mode;
3237         int rc;
3238         ENTRY;
3239
3240         /* Filesystem lock extents are extended to page boundaries so that
3241          * dealing with the page cache is a little smoother.  */
3242         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3243         policy->l_extent.end |= ~CFS_PAGE_MASK;
3244
3245         /*
3246          * kms is not valid when either object is completely fresh (so that no
3247          * locks are cached), or object was evicted. In the latter case cached
3248          * lock cannot be used, because it would prime inode state with
3249          * potentially stale LVB.
3250          */
3251         if (!kms_valid)
3252                 goto no_match;
3253
3254         /* Next, search for already existing extent locks that will cover us */
3255         /* If we're trying to read, we also search for an existing PW lock.  The
3256          * VFS and page cache already protect us locally, so lots of readers/
3257          * writers can share a single PW lock.
3258          *
3259          * There are problems with conversion deadlocks, so instead of
3260          * converting a read lock to a write lock, we'll just enqueue a new
3261          * one.
3262          *
3263          * At some point we should cancel the read lock instead of making them
3264          * send us a blocking callback, but there are problems with canceling
3265          * locks out from other users right now, too. */
3266         mode = einfo->ei_mode;
3267         if (einfo->ei_mode == LCK_PR)
3268                 mode |= LCK_PW;
3269         mode = ldlm_lock_match(obd->obd_namespace,
3270                                *flags | LDLM_FL_LVB_READY, res_id,
3271                                einfo->ei_type, policy, mode, lockh, 0);
3272         if (mode) {
3273                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3274
3275                 if (matched->l_ast_data == NULL ||
3276                     matched->l_ast_data == einfo->ei_cbdata) {
3277                         /* addref the lock only if not async requests and PW
3278                          * lock is matched whereas we asked for PR. */
3279                         if (!rqset && einfo->ei_mode != mode)
3280                                 ldlm_lock_addref(lockh, LCK_PR);
3281                         osc_set_lock_data_with_check(matched, einfo, *flags);
3282                         if (intent) {
3283                                 /* I would like to be able to ASSERT here that
3284                                  * rss <= kms, but I can't, for reasons which
3285                                  * are explained in lov_enqueue() */
3286                         }
3287
3288                         /* We already have a lock, and it's referenced */
3289                         (*upcall)(cookie, ELDLM_OK);
3290
3291                         /* For async requests, decref the lock. */
3292                         if (einfo->ei_mode != mode)
3293                                 ldlm_lock_decref(lockh, LCK_PW);
3294                         else if (rqset)
3295                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3296                         LDLM_LOCK_PUT(matched);
3297                         RETURN(ELDLM_OK);
3298                 } else
3299                         ldlm_lock_decref(lockh, mode);
3300                 LDLM_LOCK_PUT(matched);
3301         }
3302
3303  no_match:
3304         if (intent) {
3305                 CFS_LIST_HEAD(cancels);
3306                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3307                                            &RQF_LDLM_ENQUEUE_LVB);
3308                 if (req == NULL)
3309                         RETURN(-ENOMEM);
3310
3311                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3312                 if (rc)
3313                         RETURN(rc);
3314
3315                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3316                                      sizeof *lvb);
3317                 ptlrpc_request_set_replen(req);
3318         }
3319
3320         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3321         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3322
3323         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3324                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3325         if (rqset) {
3326                 if (!rc) {
3327                         struct osc_enqueue_args *aa;
3328                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3329                         aa = ptlrpc_req_async_args(req);
3330                         aa->oa_ei = einfo;
3331                         aa->oa_exp = exp;
3332                         aa->oa_flags  = flags;
3333                         aa->oa_upcall = upcall;
3334                         aa->oa_cookie = cookie;
3335                         aa->oa_lvb    = lvb;
3336                         aa->oa_lockh  = lockh;
3337
3338                         req->rq_interpret_reply =
3339                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3340                         if (rqset == PTLRPCD_SET)
3341                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3342                         else
3343                                 ptlrpc_set_add_req(rqset, req);
3344                 } else if (intent) {
3345                         ptlrpc_req_finished(req);
3346                 }
3347                 RETURN(rc);
3348         }
3349
3350         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3351         if (intent)
3352                 ptlrpc_req_finished(req);
3353
3354         RETURN(rc);
3355 }
3356
3357 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3358                        struct ldlm_enqueue_info *einfo,
3359                        struct ptlrpc_request_set *rqset)
3360 {
3361         struct ldlm_res_id res_id;
3362         int rc;
3363         ENTRY;
3364
3365         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3366                            oinfo->oi_md->lsm_object_gr, &res_id);
3367
3368         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3369                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3370                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3371                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3372                               rqset, rqset != NULL);
3373         RETURN(rc);
3374 }
3375
3376 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3377                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3378                    int *flags, void *data, struct lustre_handle *lockh,
3379                    int unref)
3380 {
3381         struct obd_device *obd = exp->exp_obd;
3382         int lflags = *flags;
3383         ldlm_mode_t rc;
3384         ENTRY;
3385
3386         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3387                 RETURN(-EIO);
3388
3389         /* Filesystem lock extents are extended to page boundaries so that
3390          * dealing with the page cache is a little smoother */
3391         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3392         policy->l_extent.end |= ~CFS_PAGE_MASK;
3393
3394         /* Next, search for already existing extent locks that will cover us */
3395         /* If we're trying to read, we also search for an existing PW lock.  The
3396          * VFS and page cache already protect us locally, so lots of readers/
3397          * writers can share a single PW lock. */
3398         rc = mode;
3399         if (mode == LCK_PR)
3400                 rc |= LCK_PW;
3401         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3402                              res_id, type, policy, rc, lockh, unref);
3403         if (rc) {
3404                 if (data != NULL)
3405                         osc_set_data_with_check(lockh, data, lflags);
3406                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3407                         ldlm_lock_addref(lockh, LCK_PR);
3408                         ldlm_lock_decref(lockh, LCK_PW);
3409                 }
3410                 RETURN(rc);
3411         }
3412         RETURN(rc);
3413 }
3414
3415 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3416 {
3417         ENTRY;
3418
3419         if (unlikely(mode == LCK_GROUP))
3420                 ldlm_lock_decref_and_cancel(lockh, mode);
3421         else
3422                 ldlm_lock_decref(lockh, mode);
3423
3424         RETURN(0);
3425 }
3426
3427 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3428                       __u32 mode, struct lustre_handle *lockh)
3429 {
3430         ENTRY;
3431         RETURN(osc_cancel_base(lockh, mode));
3432 }
3433
3434 static int osc_cancel_unused(struct obd_export *exp,
3435                              struct lov_stripe_md *lsm, int flags,
3436                              void *opaque)
3437 {
3438         struct obd_device *obd = class_exp2obd(exp);
3439         struct ldlm_res_id res_id, *resp = NULL;
3440
3441         if (lsm != NULL) {
3442                 resp = osc_build_res_name(lsm->lsm_object_id,
3443                                           lsm->lsm_object_gr, &res_id);
3444         }
3445
3446         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3447 }
3448
3449 static int osc_statfs_interpret(const struct lu_env *env,
3450                                 struct ptlrpc_request *req,
3451                                 struct osc_async_args *aa, int rc)
3452 {
3453         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3454         struct obd_statfs *msfs;
3455         ENTRY;
3456
3457         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3458             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3459                 GOTO(out, rc = 0);
3460
3461         if (rc != 0)
3462                 GOTO(out, rc);
3463
3464         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3465         if (msfs == NULL) {
3466                 GOTO(out, rc = -EPROTO);
3467         }
3468
3469         /* Reinitialize the RDONLY and DEGRADED flags at the client
3470          * on each statfs, so they don't stay set permanently. */
3471         spin_lock(&cli->cl_oscc.oscc_lock);
3472         cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_RDONLY | OSCC_FLAG_DEGRADED);
3473         if (msfs->os_state & OS_STATE_DEGRADED)
3474                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3475
3476         if (msfs->os_state & OS_STATE_READONLY)
3477                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3478         spin_unlock(&cli->cl_oscc.oscc_lock);
3479
3480         *aa->aa_oi->oi_osfs = *msfs;
3481 out:
3482         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3483         RETURN(rc);
3484 }
3485
3486 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3487                             __u64 max_age, struct ptlrpc_request_set *rqset)
3488 {
3489         struct ptlrpc_request *req;
3490         struct osc_async_args *aa;
3491         int                    rc;
3492         ENTRY;
3493
3494         /* We could possibly pass max_age in the request (as an absolute
3495          * timestamp or a "seconds.usec ago") so the target can avoid doing
3496          * extra calls into the filesystem if that isn't necessary (e.g.
3497          * during mount that would help a bit).  Having relative timestamps
3498          * is not so great if request processing is slow, while absolute
3499          * timestamps are not ideal because they need time synchronization. */
3500         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3501         if (req == NULL)
3502                 RETURN(-ENOMEM);
3503
3504         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3505         if (rc) {
3506                 ptlrpc_request_free(req);
3507                 RETURN(rc);
3508         }
3509         ptlrpc_request_set_replen(req);
3510         req->rq_request_portal = OST_CREATE_PORTAL;
3511         ptlrpc_at_set_req_timeout(req);
3512
3513         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3514                 /* procfs requests not want stat in wait for avoid deadlock */
3515                 req->rq_no_resend = 1;
3516                 req->rq_no_delay = 1;
3517         }
3518
3519         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3520         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3521         aa = ptlrpc_req_async_args(req);
3522         aa->aa_oi = oinfo;
3523
3524         ptlrpc_set_add_req(rqset, req);
3525         RETURN(0);
3526 }
3527
3528 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3529                       __u64 max_age, __u32 flags)
3530 {
3531         struct obd_statfs     *msfs;
3532         struct ptlrpc_request *req;
3533         struct obd_import     *imp = NULL;
3534         int rc;
3535         ENTRY;
3536
3537         /*Since the request might also come from lprocfs, so we need
3538          *sync this with client_disconnect_export Bug15684*/
3539         down_read(&obd->u.cli.cl_sem);
3540         if (obd->u.cli.cl_import)
3541                 imp = class_import_get(obd->u.cli.cl_import);
3542         up_read(&obd->u.cli.cl_sem);
3543         if (!imp)
3544                 RETURN(-ENODEV);
3545
3546         /* We could possibly pass max_age in the request (as an absolute
3547          * timestamp or a "seconds.usec ago") so the target can avoid doing
3548          * extra calls into the filesystem if that isn't necessary (e.g.
3549          * during mount that would help a bit).  Having relative timestamps
3550          * is not so great if request processing is slow, while absolute
3551          * timestamps are not ideal because they need time synchronization. */
3552         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3553
3554         class_import_put(imp);
3555
3556         if (req == NULL)
3557                 RETURN(-ENOMEM);
3558
3559         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3560         if (rc) {
3561                 ptlrpc_request_free(req);
3562                 RETURN(rc);
3563         }
3564         ptlrpc_request_set_replen(req);
3565         req->rq_request_portal = OST_CREATE_PORTAL;
3566         ptlrpc_at_set_req_timeout(req);
3567
3568         if (flags & OBD_STATFS_NODELAY) {
3569                 /* procfs requests not want stat in wait for avoid deadlock */
3570                 req->rq_no_resend = 1;
3571                 req->rq_no_delay = 1;
3572         }
3573
3574         rc = ptlrpc_queue_wait(req);
3575         if (rc)
3576                 GOTO(out, rc);
3577
3578         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3579         if (msfs == NULL) {
3580                 GOTO(out, rc = -EPROTO);
3581         }
3582
3583         *osfs = *msfs;
3584
3585         EXIT;
3586  out:
3587         ptlrpc_req_finished(req);
3588         return rc;
3589 }
3590
3591 /* Retrieve object striping information.
3592  *
3593  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3594  * the maximum number of OST indices which will fit in the user buffer.
3595  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3596  */
3597 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3598 {
3599         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3600         struct lov_user_md_v3 lum, *lumk;
3601         struct lov_user_ost_data_v1 *lmm_objects;
3602         int rc = 0, lum_size;
3603         ENTRY;
3604
3605         if (!lsm)
3606                 RETURN(-ENODATA);
3607
3608         /* we only need the header part from user space to get lmm_magic and
3609          * lmm_stripe_count, (the header part is common to v1 and v3) */
3610         lum_size = sizeof(struct lov_user_md_v1);
3611         if (copy_from_user(&lum, lump, lum_size))
3612                 RETURN(-EFAULT);
3613
3614         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3615             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3616                 RETURN(-EINVAL);
3617
3618         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3619         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3620         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3621         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3622
3623         /* we can use lov_mds_md_size() to compute lum_size
3624          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3625         if (lum.lmm_stripe_count > 0) {
3626                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3627                 OBD_ALLOC(lumk, lum_size);
3628                 if (!lumk)
3629                         RETURN(-ENOMEM);
3630
3631                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3632                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3633                 else
3634                         lmm_objects = &(lumk->lmm_objects[0]);
3635                 lmm_objects->l_object_id = lsm->lsm_object_id;
3636         } else {
3637                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3638                 lumk = &lum;
3639         }
3640
3641         lumk->lmm_object_id = lsm->lsm_object_id;
3642         lumk->lmm_object_gr = lsm->lsm_object_gr;
3643         lumk->lmm_stripe_count = 1;
3644
3645         if (copy_to_user(lump, lumk, lum_size))
3646                 rc = -EFAULT;
3647
3648         if (lumk != &lum)
3649                 OBD_FREE(lumk, lum_size);
3650
3651         RETURN(rc);
3652 }
3653
3654
3655 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3656                          void *karg, void *uarg)
3657 {
3658         struct obd_device *obd = exp->exp_obd;
3659         struct obd_ioctl_data *data = karg;
3660         int err = 0;
3661         ENTRY;
3662
3663         if (!try_module_get(THIS_MODULE)) {
3664                 CERROR("Can't get module. Is it alive?");
3665                 return -EINVAL;
3666         }
3667         switch (cmd) {
3668         case OBD_IOC_LOV_GET_CONFIG: {
3669                 char *buf;
3670                 struct lov_desc *desc;
3671                 struct obd_uuid uuid;
3672
3673                 buf = NULL;
3674                 len = 0;
3675                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3676                         GOTO(out, err = -EINVAL);
3677
3678                 data = (struct obd_ioctl_data *)buf;
3679
3680                 if (sizeof(*desc) > data->ioc_inllen1) {
3681                         obd_ioctl_freedata(buf, len);
3682                         GOTO(out, err = -EINVAL);
3683                 }
3684
3685                 if (data->ioc_inllen2 < sizeof(uuid)) {
3686                         obd_ioctl_freedata(buf, len);
3687                         GOTO(out, err = -EINVAL);
3688                 }
3689
3690                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3691                 desc->ld_tgt_count = 1;
3692                 desc->ld_active_tgt_count = 1;
3693                 desc->ld_default_stripe_count = 1;
3694                 desc->ld_default_stripe_size = 0;
3695                 desc->ld_default_stripe_offset = 0;
3696                 desc->ld_pattern = 0;
3697                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3698
3699                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3700
3701                 err = copy_to_user((void *)uarg, buf, len);
3702                 if (err)
3703                         err = -EFAULT;
3704                 obd_ioctl_freedata(buf, len);
3705                 GOTO(out, err);
3706         }
3707         case LL_IOC_LOV_SETSTRIPE:
3708                 err = obd_alloc_memmd(exp, karg);
3709                 if (err > 0)
3710                         err = 0;
3711                 GOTO(out, err);
3712         case LL_IOC_LOV_GETSTRIPE:
3713                 err = osc_getstripe(karg, uarg);
3714                 GOTO(out, err);
3715         case OBD_IOC_CLIENT_RECOVER:
3716                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3717                                             data->ioc_inlbuf1);
3718                 if (err > 0)
3719                         err = 0;
3720                 GOTO(out, err);
3721         case IOC_OSC_SET_ACTIVE:
3722                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3723                                                data->ioc_offset);
3724                 GOTO(out, err);
3725         case OBD_IOC_POLL_QUOTACHECK:
3726                 err = lquota_poll_check(quota_interface, exp,
3727                                         (struct if_quotacheck *)karg);
3728                 GOTO(out, err);
3729         case OBD_IOC_PING_TARGET:
3730                 err = ptlrpc_obd_ping(obd);
3731                 GOTO(out, err);
3732         default:
3733                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3734                        cmd, cfs_curproc_comm());
3735                 GOTO(out, err = -ENOTTY);
3736         }
3737 out:
3738         module_put(THIS_MODULE);
3739         return err;
3740 }
3741
3742 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3743                         void *key, __u32 *vallen, void *val,
3744                         struct lov_stripe_md *lsm)
3745 {
3746         ENTRY;
3747         if (!vallen || !val)
3748                 RETURN(-EFAULT);
3749
3750         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3751                 __u32 *stripe = val;
3752                 *vallen = sizeof(*stripe);
3753                 *stripe = 0;
3754                 RETURN(0);
3755         } else if (KEY_IS(KEY_LAST_ID)) {
3756                 struct ptlrpc_request *req;
3757                 obd_id                *reply;
3758                 char                  *tmp;
3759                 int                    rc;
3760
3761                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3762                                            &RQF_OST_GET_INFO_LAST_ID);
3763                 if (req == NULL)
3764                         RETURN(-ENOMEM);
3765
3766                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3767                                      RCL_CLIENT, keylen);
3768                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3769                 if (rc) {
3770                         ptlrpc_request_free(req);
3771                         RETURN(rc);
3772                 }
3773
3774                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3775                 memcpy(tmp, key, keylen);
3776
3777                 req->rq_no_delay = req->rq_no_resend = 1;
3778                 ptlrpc_request_set_replen(req);
3779                 rc = ptlrpc_queue_wait(req);
3780                 if (rc)
3781                         GOTO(out, rc);
3782
3783                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3784                 if (reply == NULL)
3785                         GOTO(out, rc = -EPROTO);
3786
3787                 *((obd_id *)val) = *reply;
3788         out:
3789                 ptlrpc_req_finished(req);
3790                 RETURN(rc);
3791         } else if (KEY_IS(KEY_FIEMAP)) {
3792                 struct ptlrpc_request *req;
3793                 struct ll_user_fiemap *reply;
3794                 char *tmp;
3795                 int rc;
3796
3797                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3798                                            &RQF_OST_GET_INFO_FIEMAP);
3799                 if (req == NULL)
3800                         RETURN(-ENOMEM);
3801
3802                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3803                                      RCL_CLIENT, keylen);
3804                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3805                                      RCL_CLIENT, *vallen);
3806                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3807                                      RCL_SERVER, *vallen);
3808
3809                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3810                 if (rc) {
3811                         ptlrpc_request_free(req);
3812                         RETURN(rc);
3813                 }
3814
3815                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3816                 memcpy(tmp, key, keylen);
3817                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3818                 memcpy(tmp, val, *vallen);
3819
3820                 ptlrpc_request_set_replen(req);
3821                 rc = ptlrpc_queue_wait(req);
3822                 if (rc)
3823                         GOTO(out1, rc);
3824
3825                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3826                 if (reply == NULL)
3827                         GOTO(out1, rc = -EPROTO);
3828
3829                 memcpy(val, reply, *vallen);
3830         out1:
3831                 ptlrpc_req_finished(req);
3832
3833                 RETURN(rc);
3834         }
3835
3836         RETURN(-EINVAL);
3837 }
3838
3839 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3840 {
3841         struct llog_ctxt *ctxt;
3842         int rc = 0;
3843         ENTRY;
3844
3845         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3846         if (ctxt) {
3847                 rc = llog_initiator_connect(ctxt);
3848                 llog_ctxt_put(ctxt);
3849         } else {
3850                 /* XXX return an error? skip setting below flags? */
3851         }
3852
3853         spin_lock(&imp->imp_lock);
3854         imp->imp_server_timeout = 1;
3855         imp->imp_pingable = 1;
3856         spin_unlock(&imp->imp_lock);
3857         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3858
3859         RETURN(rc);
3860 }
3861
3862 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3863                                           struct ptlrpc_request *req,
3864                                           void *aa, int rc)
3865 {
3866         ENTRY;
3867         if (rc != 0)
3868                 RETURN(rc);
3869
3870         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3871 }
3872
3873 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3874                               void *key, obd_count vallen, void *val,
3875                               struct ptlrpc_request_set *set)
3876 {
3877         struct ptlrpc_request *req;
3878         struct obd_device     *obd = exp->exp_obd;
3879         struct obd_import     *imp = class_exp2cliimp(exp);
3880         char                  *tmp;
3881         int                    rc;
3882         ENTRY;
3883
3884         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3885
3886         if (KEY_IS(KEY_NEXT_ID)) {
3887                 if (vallen != sizeof(obd_id))
3888                         RETURN(-ERANGE);
3889                 if (val == NULL)
3890                         RETURN(-EINVAL);
3891                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3892                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3893                        exp->exp_obd->obd_name,
3894                        obd->u.cli.cl_oscc.oscc_next_id);
3895
3896                 RETURN(0);
3897         }
3898
3899         if (KEY_IS(KEY_UNLINKED)) {
3900                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3901                 spin_lock(&oscc->oscc_lock);
3902                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3903                 spin_unlock(&oscc->oscc_lock);
3904                 RETURN(0);
3905         }
3906
3907         if (KEY_IS(KEY_INIT_RECOV)) {
3908                 if (vallen != sizeof(int))
3909                         RETURN(-EINVAL);
3910                 spin_lock(&imp->imp_lock);
3911                 imp->imp_initial_recov = *(int *)val;
3912                 spin_unlock(&imp->imp_lock);
3913                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3914                        exp->exp_obd->obd_name,
3915                        imp->imp_initial_recov);
3916                 RETURN(0);
3917         }
3918
3919         if (KEY_IS(KEY_CHECKSUM)) {
3920                 if (vallen != sizeof(int))
3921                         RETURN(-EINVAL);
3922                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3923                 RETURN(0);
3924         }
3925
3926         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3927                 sptlrpc_conf_client_adapt(obd);
3928                 RETURN(0);
3929         }
3930
3931         if (KEY_IS(KEY_FLUSH_CTX)) {
3932                 sptlrpc_import_flush_my_ctx(imp);
3933                 RETURN(0);
3934         }
3935
3936         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3937                 RETURN(-EINVAL);
3938
3939         /* We pass all other commands directly to OST. Since nobody calls osc
3940            methods directly and everybody is supposed to go through LOV, we
3941            assume lov checked invalid values for us.
3942            The only recognised values so far are evict_by_nid and mds_conn.
3943            Even if something bad goes through, we'd get a -EINVAL from OST
3944            anyway. */
3945
3946         if (KEY_IS(KEY_GRANT_SHRINK))
3947                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3948         else
3949                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3950
3951         if (req == NULL)
3952                 RETURN(-ENOMEM);
3953
3954         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3955                              RCL_CLIENT, keylen);
3956         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3957                              RCL_CLIENT, vallen);
3958         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3959         if (rc) {
3960                 ptlrpc_request_free(req);
3961                 RETURN(rc);
3962         }
3963
3964         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3965         memcpy(tmp, key, keylen);
3966         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3967         memcpy(tmp, val, vallen);
3968
3969         if (KEY_IS(KEY_MDS_CONN)) {
3970                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3971
3972                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3973                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3974                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3975                 req->rq_no_delay = req->rq_no_resend = 1;
3976                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3977         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3978                 struct osc_grant_args *aa;
3979                 struct obdo *oa;
3980
3981                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3982                 aa = ptlrpc_req_async_args(req);
3983                 OBD_ALLOC_PTR(oa);
3984                 if (!oa) {
3985                         ptlrpc_req_finished(req);
3986                         RETURN(-ENOMEM);
3987                 }
3988                 *oa = ((struct ost_body *)val)->oa;
3989                 aa->aa_oa = oa;
3990                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3991         }
3992
3993         ptlrpc_request_set_replen(req);
3994         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3995                 LASSERT(set != NULL);
3996                 ptlrpc_set_add_req(set, req);
3997                 ptlrpc_check_set(NULL, set);
3998         } else
3999                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4000
4001         RETURN(0);
4002 }
4003
4004
4005 static struct llog_operations osc_size_repl_logops = {
4006         lop_cancel: llog_obd_repl_cancel
4007 };
4008
4009 static struct llog_operations osc_mds_ost_orig_logops;
4010
4011 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4012                            struct obd_device *tgt, struct llog_catid *catid)
4013 {
4014         int rc;
4015         ENTRY;
4016
4017         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4018                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4019         if (rc) {
4020                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4021                 GOTO(out, rc);
4022         }
4023
4024         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4025                         NULL, &osc_size_repl_logops);
4026         if (rc) {
4027                 struct llog_ctxt *ctxt =
4028                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4029                 if (ctxt)
4030                         llog_cleanup(ctxt);
4031                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4032         }
4033         GOTO(out, rc);
4034 out:
4035         if (rc) {
4036                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4037                        obd->obd_name, tgt->obd_name, catid, rc);
4038                 CERROR("logid "LPX64":0x%x\n",
4039                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4040         }
4041         return rc;
4042 }
4043
4044 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4045                          struct obd_device *disk_obd, int *index)
4046 {
4047         struct llog_catid catid;
4048         static char name[32] = CATLIST;
4049         int rc;
4050         ENTRY;
4051
4052         LASSERT(olg == &obd->obd_olg);
4053
4054         mutex_down(&olg->olg_cat_processing);
4055         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4056         if (rc) {
4057                 CERROR("rc: %d\n", rc);
4058                 GOTO(out, rc);
4059         }
4060
4061         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4062                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4063                catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4064
4065         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4066         if (rc) {
4067                 CERROR("rc: %d\n", rc);
4068                 GOTO(out, rc);
4069         }
4070
4071         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4072         if (rc) {
4073                 CERROR("rc: %d\n", rc);
4074                 GOTO(out, rc);
4075         }
4076
4077  out:
4078         mutex_up(&olg->olg_cat_processing);
4079
4080         return rc;
4081 }
4082
4083 static int osc_llog_finish(struct obd_device *obd, int count)
4084 {
4085         struct llog_ctxt *ctxt;
4086         int rc = 0, rc2 = 0;
4087         ENTRY;
4088
4089         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4090         if (ctxt)
4091                 rc = llog_cleanup(ctxt);
4092
4093         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4094         if (ctxt)
4095                 rc2 = llog_cleanup(ctxt);
4096         if (!rc)
4097                 rc = rc2;
4098
4099         RETURN(rc);
4100 }
4101
4102 static int osc_reconnect(const struct lu_env *env,
4103                          struct obd_export *exp, struct obd_device *obd,
4104                          struct obd_uuid *cluuid,
4105                          struct obd_connect_data *data,
4106                          void *localdata)
4107 {
4108         struct client_obd *cli = &obd->u.cli;
4109
4110         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4111                 long lost_grant;
4112
4113                 client_obd_list_lock(&cli->cl_loi_list_lock);
4114                 data->ocd_grant = cli->cl_avail_grant ?:
4115                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4116                 lost_grant = cli->cl_lost_grant;
4117                 cli->cl_lost_grant = 0;
4118                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4119
4120                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4121                        "cl_lost_grant: %ld\n", data->ocd_grant,
4122                        cli->cl_avail_grant, lost_grant);
4123                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4124                        " ocd_grant: %d\n", data->ocd_connect_flags,
4125                        data->ocd_version, data->ocd_grant);
4126         }
4127
4128         RETURN(0);
4129 }
4130
4131 static int osc_disconnect(struct obd_export *exp)
4132 {
4133         struct obd_device *obd = class_exp2obd(exp);
4134         struct llog_ctxt  *ctxt;
4135         int rc;
4136
4137         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4138         if (ctxt) {
4139                 if (obd->u.cli.cl_conn_count == 1) {
4140                         /* Flush any remaining cancel messages out to the
4141                          * target */
4142                         llog_sync(ctxt, exp);
4143                 }
4144                 llog_ctxt_put(ctxt);
4145         } else {
4146                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4147                        obd);
4148         }
4149
4150         rc = client_disconnect_export(exp);
4151         /**
4152          * Initially we put del_shrink_grant before disconnect_export, but it
4153          * causes the following problem if setup (connect) and cleanup
4154          * (disconnect) are tangled together.
4155          *      connect p1                     disconnect p2
4156          *   ptlrpc_connect_import
4157          *     ...............               class_manual_cleanup
4158          *                                     osc_disconnect
4159          *                                     del_shrink_grant
4160          *   ptlrpc_connect_interrupt
4161          *     init_grant_shrink
4162          *   add this client to shrink list
4163          *                                      cleanup_osc
4164          * Bang! pinger trigger the shrink.
4165          * So the osc should be disconnected from the shrink list, after we
4166          * are sure the import has been destroyed. BUG18662
4167          */
4168         if (obd->u.cli.cl_import == NULL)
4169                 osc_del_shrink_grant(&obd->u.cli);
4170         return rc;
4171 }
4172
4173 static int osc_import_event(struct obd_device *obd,
4174                             struct obd_import *imp,
4175                             enum obd_import_event event)
4176 {
4177         struct client_obd *cli;
4178         int rc = 0;
4179
4180         ENTRY;
4181         LASSERT(imp->imp_obd == obd);
4182
4183         switch (event) {
4184         case IMP_EVENT_DISCON: {
4185                 /* Only do this on the MDS OSC's */
4186                 if (imp->imp_server_timeout) {
4187                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4188
4189                         spin_lock(&oscc->oscc_lock);
4190                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4191                         spin_unlock(&oscc->oscc_lock);
4192                 }
4193                 cli = &obd->u.cli;
4194                 client_obd_list_lock(&cli->cl_loi_list_lock);
4195                 cli->cl_avail_grant = 0;
4196                 cli->cl_lost_grant = 0;
4197                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4198                 break;
4199         }
4200         case IMP_EVENT_INACTIVE: {
4201                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4202                 break;
4203         }
4204         case IMP_EVENT_INVALIDATE: {
4205                 struct ldlm_namespace *ns = obd->obd_namespace;
4206                 struct lu_env         *env;
4207                 int                    refcheck;
4208
4209                 env = cl_env_get(&refcheck);
4210                 if (!IS_ERR(env)) {
4211                         /* Reset grants */
4212                         cli = &obd->u.cli;
4213                         client_obd_list_lock(&cli->cl_loi_list_lock);
4214                         /* all pages go to failing rpcs due to the invalid
4215                          * import */
4216                         osc_check_rpcs(env, cli);
4217                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4218
4219                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4220                         cl_env_put(env, &refcheck);
4221                 } else
4222                         rc = PTR_ERR(env);
4223                 break;
4224         }
4225         case IMP_EVENT_ACTIVE: {
4226                 /* Only do this on the MDS OSC's */
4227                 if (imp->imp_server_timeout) {
4228                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4229
4230                         spin_lock(&oscc->oscc_lock);
4231                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4232                         spin_unlock(&oscc->oscc_lock);
4233                 }
4234                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4235                 break;
4236         }
4237         case IMP_EVENT_OCD: {
4238                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4239
4240                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4241                         osc_init_grant(&obd->u.cli, ocd);
4242
4243                 /* See bug 7198 */
4244                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4245                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4246
4247                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4248                 break;
4249         }
4250         default:
4251                 CERROR("Unknown import event %d\n", event);
4252                 LBUG();
4253         }
4254         RETURN(rc);
4255 }
4256
4257 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4258 {
4259         int rc;
4260         ENTRY;
4261
4262         ENTRY;
4263         rc = ptlrpcd_addref();
4264         if (rc)
4265                 RETURN(rc);
4266
4267         rc = client_obd_setup(obd, lcfg);
4268         if (rc) {
4269                 ptlrpcd_decref();
4270         } else {
4271                 struct lprocfs_static_vars lvars = { 0 };
4272                 struct client_obd *cli = &obd->u.cli;
4273
4274                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4275                 lprocfs_osc_init_vars(&lvars);
4276                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4277                         lproc_osc_attach_seqstat(obd);
4278                         sptlrpc_lprocfs_cliobd_attach(obd);
4279                         ptlrpc_lprocfs_register_obd(obd);
4280                 }
4281
4282                 oscc_init(obd);
4283                 /* We need to allocate a few requests more, because
4284                    brw_interpret tries to create new requests before freeing
4285                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4286                    reserved, but I afraid that might be too much wasted RAM
4287                    in fact, so 2 is just my guess and still should work. */
4288                 cli->cl_import->imp_rq_pool =
4289                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4290                                             OST_MAXREQSIZE,
4291                                             ptlrpc_add_rqs_to_pool);
4292
4293                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4294                 sema_init(&cli->cl_grant_sem, 1);
4295         }
4296
4297         RETURN(rc);
4298 }
4299
4300 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4301 {
4302         int rc = 0;
4303         ENTRY;
4304
4305         switch (stage) {
4306         case OBD_CLEANUP_EARLY: {
4307                 struct obd_import *imp;
4308                 imp = obd->u.cli.cl_import;
4309                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4310                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4311                 ptlrpc_deactivate_import(imp);
4312                 spin_lock(&imp->imp_lock);
4313                 imp->imp_pingable = 0;
4314                 spin_unlock(&imp->imp_lock);
4315                 break;
4316         }
4317         case OBD_CLEANUP_EXPORTS: {
4318                 /* If we set up but never connected, the
4319                    client import will not have been cleaned. */
4320                 if (obd->u.cli.cl_import) {
4321                         struct obd_import *imp;
4322                         down_write(&obd->u.cli.cl_sem);
4323                         imp = obd->u.cli.cl_import;
4324                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4325                                obd->obd_name);
4326                         ptlrpc_invalidate_import(imp);
4327                         if (imp->imp_rq_pool) {
4328                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4329                                 imp->imp_rq_pool = NULL;
4330                         }
4331                         class_destroy_import(imp);
4332                         up_write(&obd->u.cli.cl_sem);
4333                         obd->u.cli.cl_import = NULL;
4334                 }
4335                 rc = obd_llog_finish(obd, 0);
4336                 if (rc != 0)
4337                         CERROR("failed to cleanup llogging subsystems\n");
4338                 break;
4339                 }
4340         }
4341         RETURN(rc);
4342 }
4343
4344 int osc_cleanup(struct obd_device *obd)
4345 {
4346         int rc;
4347
4348         ENTRY;
4349         ptlrpc_lprocfs_unregister_obd(obd);
4350         lprocfs_obd_cleanup(obd);
4351
4352         /* free memory of osc quota cache */
4353         lquota_cleanup(quota_interface, obd);
4354
4355         rc = client_obd_cleanup(obd);
4356
4357         ptlrpcd_decref();
4358         RETURN(rc);
4359 }
4360
4361 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4362 {
4363         struct lprocfs_static_vars lvars = { 0 };
4364         int rc = 0;
4365
4366         lprocfs_osc_init_vars(&lvars);
4367
4368         switch (lcfg->lcfg_command) {
4369         default:
4370                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4371                                               lcfg, obd);
4372                 if (rc > 0)
4373                         rc = 0;
4374                 break;
4375         }
4376
4377         return(rc);
4378 }
4379
4380 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4381 {
4382         return osc_process_config_base(obd, buf);
4383 }
4384
4385 struct obd_ops osc_obd_ops = {
4386         .o_owner                = THIS_MODULE,
4387         .o_setup                = osc_setup,
4388         .o_precleanup           = osc_precleanup,
4389         .o_cleanup              = osc_cleanup,
4390         .o_add_conn             = client_import_add_conn,
4391         .o_del_conn             = client_import_del_conn,
4392         .o_connect              = client_connect_import,
4393         .o_reconnect            = osc_reconnect,
4394         .o_disconnect           = osc_disconnect,
4395         .o_statfs               = osc_statfs,
4396         .o_statfs_async         = osc_statfs_async,
4397         .o_packmd               = osc_packmd,
4398         .o_unpackmd             = osc_unpackmd,
4399         .o_precreate            = osc_precreate,
4400         .o_create               = osc_create,
4401         .o_create_async         = osc_create_async,
4402         .o_destroy              = osc_destroy,
4403         .o_getattr              = osc_getattr,
4404         .o_getattr_async        = osc_getattr_async,
4405         .o_setattr              = osc_setattr,
4406         .o_setattr_async        = osc_setattr_async,
4407         .o_brw                  = osc_brw,
4408         .o_punch                = osc_punch,
4409         .o_sync                 = osc_sync,
4410         .o_enqueue              = osc_enqueue,
4411         .o_change_cbdata        = osc_change_cbdata,
4412         .o_cancel               = osc_cancel,
4413         .o_cancel_unused        = osc_cancel_unused,
4414         .o_iocontrol            = osc_iocontrol,
4415         .o_get_info             = osc_get_info,
4416         .o_set_info_async       = osc_set_info_async,
4417         .o_import_event         = osc_import_event,
4418         .o_llog_init            = osc_llog_init,
4419         .o_llog_finish          = osc_llog_finish,
4420         .o_process_config       = osc_process_config,
4421 };
4422
4423 extern struct lu_kmem_descr  osc_caches[];
4424 extern spinlock_t            osc_ast_guard;
4425 extern struct lock_class_key osc_ast_guard_class;
4426
4427 int __init osc_init(void)
4428 {
4429         struct lprocfs_static_vars lvars = { 0 };
4430         int rc;
4431         ENTRY;
4432
4433         /* print an address of _any_ initialized kernel symbol from this
4434          * module, to allow debugging with gdb that doesn't support data
4435          * symbols from modules.*/
4436         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4437
4438         rc = lu_kmem_init(osc_caches);
4439
4440         lprocfs_osc_init_vars(&lvars);
4441
4442         request_module("lquota");
4443         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4444         lquota_init(quota_interface);
4445         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4446
4447         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4448                                  LUSTRE_OSC_NAME, &osc_device_type);
4449         if (rc) {
4450                 if (quota_interface)
4451                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4452                 lu_kmem_fini(osc_caches);
4453                 RETURN(rc);
4454         }
4455
4456         spin_lock_init(&osc_ast_guard);
4457         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4458
4459         osc_mds_ost_orig_logops = llog_lvfs_ops;
4460         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4461         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4462         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4463         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4464
4465         RETURN(rc);
4466 }
4467
4468 #ifdef __KERNEL__
4469 static void /*__exit*/ osc_exit(void)
4470 {
4471         lu_device_type_fini(&osc_device_type);
4472
4473         lquota_exit(quota_interface);
4474         if (quota_interface)
4475                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4476
4477         class_unregister_type(LUSTRE_OSC_NAME);
4478         lu_kmem_fini(osc_caches);
4479 }
4480
4481 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4482 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4483 MODULE_LICENSE("GPL");
4484
4485 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4486 #endif