Whamcloud - gitweb
LU-874 osc: prioritize writeback pages
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #ifndef EXPORT_SYMTAB
41 # define EXPORT_SYMTAB
42 #endif
43 #define DEBUG_SUBSYSTEM S_OSC
44
45 #include <libcfs/libcfs.h>
46
47 #ifndef __KERNEL__
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_dlm.h>
52 #include <lustre_net.h>
53 #include <lustre/lustre_user.h>
54 #include <obd_cksum.h>
55 #include <obd_ost.h>
56 #include <obd_lov.h>
57
58 #ifdef  __CYGWIN__
59 # include <ctype.h>
60 #endif
61
62 #include <lustre_ha.h>
63 #include <lprocfs_status.h>
64 #include <lustre_log.h>
65 #include <lustre_debug.h>
66 #include <lustre_param.h>
67 #include "osc_internal.h"
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
73                             int ptlrpc);
74 int osc_cleanup(struct obd_device *obd);
75
76 /* Pack OSC object metadata for disk storage (LE byte order). */
77 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
78                       struct lov_stripe_md *lsm)
79 {
80         int lmm_size;
81         ENTRY;
82
83         lmm_size = sizeof(**lmmp);
84         if (!lmmp)
85                 RETURN(lmm_size);
86
87         if (*lmmp && !lsm) {
88                 OBD_FREE(*lmmp, lmm_size);
89                 *lmmp = NULL;
90                 RETURN(0);
91         }
92
93         if (!*lmmp) {
94                 OBD_ALLOC(*lmmp, lmm_size);
95                 if (!*lmmp)
96                         RETURN(-ENOMEM);
97         }
98
99         if (lsm) {
100                 LASSERT(lsm->lsm_object_id);
101                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
102                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
103                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
104         }
105
106         RETURN(lmm_size);
107 }
108
109 /* Unpack OSC object metadata from disk storage (LE byte order). */
110 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
111                         struct lov_mds_md *lmm, int lmm_bytes)
112 {
113         int lsm_size;
114         struct obd_import *imp = class_exp2cliimp(exp);
115         ENTRY;
116
117         if (lmm != NULL) {
118                 if (lmm_bytes < sizeof (*lmm)) {
119                         CERROR("lov_mds_md too small: %d, need %d\n",
120                                lmm_bytes, (int)sizeof(*lmm));
121                         RETURN(-EINVAL);
122                 }
123                 /* XXX LOV_MAGIC etc check? */
124
125                 if (lmm->lmm_object_id == 0) {
126                         CERROR("lov_mds_md: zero lmm_object_id\n");
127                         RETURN(-EINVAL);
128                 }
129         }
130
131         lsm_size = lov_stripe_md_size(1);
132         if (lsmp == NULL)
133                 RETURN(lsm_size);
134
135         if (*lsmp != NULL && lmm == NULL) {
136                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137                 OBD_FREE(*lsmp, lsm_size);
138                 *lsmp = NULL;
139                 RETURN(0);
140         }
141
142         if (*lsmp == NULL) {
143                 OBD_ALLOC(*lsmp, lsm_size);
144                 if (*lsmp == NULL)
145                         RETURN(-ENOMEM);
146                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
147                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
148                         OBD_FREE(*lsmp, lsm_size);
149                         RETURN(-ENOMEM);
150                 }
151                 loi_init((*lsmp)->lsm_oinfo[0]);
152         }
153
154         if (lmm != NULL) {
155                 /* XXX zero *lsmp? */
156                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
157                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
158                 LASSERT((*lsmp)->lsm_object_id);
159                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
160         }
161
162         if (imp != NULL &&
163             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
164                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
165         else
166                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
167
168         RETURN(lsm_size);
169 }
170
171 static inline void osc_pack_capa(struct ptlrpc_request *req,
172                                  struct ost_body *body, void *capa)
173 {
174         struct obd_capa *oc = (struct obd_capa *)capa;
175         struct lustre_capa *c;
176
177         if (!capa)
178                 return;
179
180         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
181         LASSERT(c);
182         capa_cpy(c, oc);
183         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
184         DEBUG_CAPA(D_SEC, c, "pack");
185 }
186
187 static inline void osc_pack_req_body(struct ptlrpc_request *req,
188                                      struct obd_info *oinfo)
189 {
190         struct ost_body *body;
191
192         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
193         LASSERT(body);
194
195         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
196         osc_pack_capa(req, body, oinfo->oi_capa);
197 }
198
199 static inline void osc_set_capa_size(struct ptlrpc_request *req,
200                                      const struct req_msg_field *field,
201                                      struct obd_capa *oc)
202 {
203         if (oc == NULL)
204                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
205         else
206                 /* it is already calculated as sizeof struct obd_capa */
207                 ;
208 }
209
210 static int osc_getattr_interpret(const struct lu_env *env,
211                                  struct ptlrpc_request *req,
212                                  struct osc_async_args *aa, int rc)
213 {
214         struct ost_body *body;
215         ENTRY;
216
217         if (rc != 0)
218                 GOTO(out, rc);
219
220         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
221         if (body) {
222                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
223                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
224
225                 /* This should really be sent by the OST */
226                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
227                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
228         } else {
229                 CDEBUG(D_INFO, "can't unpack ost_body\n");
230                 rc = -EPROTO;
231                 aa->aa_oi->oi_oa->o_valid = 0;
232         }
233 out:
234         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
235         RETURN(rc);
236 }
237
238 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
239                              struct ptlrpc_request_set *set)
240 {
241         struct ptlrpc_request *req;
242         struct osc_async_args *aa;
243         int                    rc;
244         ENTRY;
245
246         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
247         if (req == NULL)
248                 RETURN(-ENOMEM);
249
250         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
251         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
252         if (rc) {
253                 ptlrpc_request_free(req);
254                 RETURN(rc);
255         }
256
257         osc_pack_req_body(req, oinfo);
258
259         ptlrpc_request_set_replen(req);
260         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
261
262         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
263         aa = ptlrpc_req_async_args(req);
264         aa->aa_oi = oinfo;
265
266         ptlrpc_set_add_req(set, req);
267         RETURN(0);
268 }
269
270 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
271 {
272         struct ptlrpc_request *req;
273         struct ost_body       *body;
274         int                    rc;
275         ENTRY;
276
277         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
278         if (req == NULL)
279                 RETURN(-ENOMEM);
280
281         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
282         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
283         if (rc) {
284                 ptlrpc_request_free(req);
285                 RETURN(rc);
286         }
287
288         osc_pack_req_body(req, oinfo);
289
290         ptlrpc_request_set_replen(req);
291
292         rc = ptlrpc_queue_wait(req);
293         if (rc)
294                 GOTO(out, rc);
295
296         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
297         if (body == NULL)
298                 GOTO(out, rc = -EPROTO);
299
300         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
301         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
302
303         /* This should really be sent by the OST */
304         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
305         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
306
307         EXIT;
308  out:
309         ptlrpc_req_finished(req);
310         return rc;
311 }
312
313 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
314                        struct obd_trans_info *oti)
315 {
316         struct ptlrpc_request *req;
317         struct ost_body       *body;
318         int                    rc;
319         ENTRY;
320
321         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
322
323         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324         if (req == NULL)
325                 RETURN(-ENOMEM);
326
327         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
328         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
329         if (rc) {
330                 ptlrpc_request_free(req);
331                 RETURN(rc);
332         }
333
334         osc_pack_req_body(req, oinfo);
335
336         ptlrpc_request_set_replen(req);
337
338         rc = ptlrpc_queue_wait(req);
339         if (rc)
340                 GOTO(out, rc);
341
342         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
343         if (body == NULL)
344                 GOTO(out, rc = -EPROTO);
345
346         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
347
348         EXIT;
349 out:
350         ptlrpc_req_finished(req);
351         RETURN(rc);
352 }
353
354 static int osc_setattr_interpret(const struct lu_env *env,
355                                  struct ptlrpc_request *req,
356                                  struct osc_setattr_args *sa, int rc)
357 {
358         struct ost_body *body;
359         ENTRY;
360
361         if (rc != 0)
362                 GOTO(out, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 GOTO(out, rc = -EPROTO);
367
368         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
369 out:
370         rc = sa->sa_upcall(sa->sa_cookie, rc);
371         RETURN(rc);
372 }
373
374 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
375                            struct obd_trans_info *oti,
376                            obd_enqueue_update_f upcall, void *cookie,
377                            struct ptlrpc_request_set *rqset)
378 {
379         struct ptlrpc_request   *req;
380         struct osc_setattr_args *sa;
381         int                      rc;
382         ENTRY;
383
384         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
385         if (req == NULL)
386                 RETURN(-ENOMEM);
387
388         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
389         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
390         if (rc) {
391                 ptlrpc_request_free(req);
392                 RETURN(rc);
393         }
394
395         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
396                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
397
398         osc_pack_req_body(req, oinfo);
399
400         ptlrpc_request_set_replen(req);
401
402         /* do mds to ost setattr asynchronously */
403         if (!rqset) {
404                 /* Do not wait for response. */
405                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
406         } else {
407                 req->rq_interpret_reply =
408                         (ptlrpc_interpterer_t)osc_setattr_interpret;
409
410                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
411                 sa = ptlrpc_req_async_args(req);
412                 sa->sa_oa = oinfo->oi_oa;
413                 sa->sa_upcall = upcall;
414                 sa->sa_cookie = cookie;
415
416                 if (rqset == PTLRPCD_SET)
417                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
418                 else
419                         ptlrpc_set_add_req(rqset, req);
420         }
421
422         RETURN(0);
423 }
424
425 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
426                              struct obd_trans_info *oti,
427                              struct ptlrpc_request_set *rqset)
428 {
429         return osc_setattr_async_base(exp, oinfo, oti,
430                                       oinfo->oi_cb_up, oinfo, rqset);
431 }
432
433 int osc_real_create(struct obd_export *exp, struct obdo *oa,
434                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
435 {
436         struct ptlrpc_request *req;
437         struct ost_body       *body;
438         struct lov_stripe_md  *lsm;
439         int                    rc;
440         ENTRY;
441
442         LASSERT(oa);
443         LASSERT(ea);
444
445         lsm = *ea;
446         if (!lsm) {
447                 rc = obd_alloc_memmd(exp, &lsm);
448                 if (rc < 0)
449                         RETURN(rc);
450         }
451
452         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
453         if (req == NULL)
454                 GOTO(out, rc = -ENOMEM);
455
456         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
457         if (rc) {
458                 ptlrpc_request_free(req);
459                 GOTO(out, rc);
460         }
461
462         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
463         LASSERT(body);
464         lustre_set_wire_obdo(&body->oa, oa);
465
466         ptlrpc_request_set_replen(req);
467
468         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
469             oa->o_flags == OBD_FL_DELORPHAN) {
470                 DEBUG_REQ(D_HA, req,
471                           "delorphan from OST integration");
472                 /* Don't resend the delorphan req */
473                 req->rq_no_resend = req->rq_no_delay = 1;
474         }
475
476         rc = ptlrpc_queue_wait(req);
477         if (rc)
478                 GOTO(out_req, rc);
479
480         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
481         if (body == NULL)
482                 GOTO(out_req, rc = -EPROTO);
483
484         lustre_get_wire_obdo(oa, &body->oa);
485
486         /* This should really be sent by the OST */
487         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
488         oa->o_valid |= OBD_MD_FLBLKSZ;
489
490         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
491          * have valid lsm_oinfo data structs, so don't go touching that.
492          * This needs to be fixed in a big way.
493          */
494         lsm->lsm_object_id = oa->o_id;
495         lsm->lsm_object_seq = oa->o_seq;
496         *ea = lsm;
497
498         if (oti != NULL) {
499                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
500
501                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
502                         if (!oti->oti_logcookies)
503                                 oti_alloc_cookies(oti, 1);
504                         *oti->oti_logcookies = oa->o_lcookie;
505                 }
506         }
507
508         CDEBUG(D_HA, "transno: "LPD64"\n",
509                lustre_msg_get_transno(req->rq_repmsg));
510 out_req:
511         ptlrpc_req_finished(req);
512 out:
513         if (rc && !*ea)
514                 obd_free_memmd(exp, &lsm);
515         RETURN(rc);
516 }
517
518 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
519                    obd_enqueue_update_f upcall, void *cookie,
520                    struct ptlrpc_request_set *rqset)
521 {
522         struct ptlrpc_request   *req;
523         struct osc_setattr_args *sa;
524         struct ost_body         *body;
525         int                      rc;
526         ENTRY;
527
528         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
529         if (req == NULL)
530                 RETURN(-ENOMEM);
531
532         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
533         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
534         if (rc) {
535                 ptlrpc_request_free(req);
536                 RETURN(rc);
537         }
538         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539         ptlrpc_at_set_req_timeout(req);
540
541         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
542         LASSERT(body);
543         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
544         osc_pack_capa(req, body, oinfo->oi_capa);
545
546         ptlrpc_request_set_replen(req);
547
548
549         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
550         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
551         sa = ptlrpc_req_async_args(req);
552         sa->sa_oa     = oinfo->oi_oa;
553         sa->sa_upcall = upcall;
554         sa->sa_cookie = cookie;
555         if (rqset == PTLRPCD_SET)
556                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
557         else
558                 ptlrpc_set_add_req(rqset, req);
559
560         RETURN(0);
561 }
562
563 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
564                      struct obd_trans_info *oti,
565                      struct ptlrpc_request_set *rqset)
566 {
567         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
568         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
569         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
570         return osc_punch_base(exp, oinfo,
571                               oinfo->oi_cb_up, oinfo, rqset);
572 }
573
574 static int osc_sync_interpret(const struct lu_env *env,
575                               struct ptlrpc_request *req,
576                               void *arg, int rc)
577 {
578         struct osc_async_args *aa = arg;
579         struct ost_body *body;
580         ENTRY;
581
582         if (rc)
583                 GOTO(out, rc);
584
585         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
586         if (body == NULL) {
587                 CERROR ("can't unpack ost_body\n");
588                 GOTO(out, rc = -EPROTO);
589         }
590
591         *aa->aa_oi->oi_oa = body->oa;
592 out:
593         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
594         RETURN(rc);
595 }
596
597 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
598                     obd_size start, obd_size end,
599                     struct ptlrpc_request_set *set)
600 {
601         struct ptlrpc_request *req;
602         struct ost_body       *body;
603         struct osc_async_args *aa;
604         int                    rc;
605         ENTRY;
606
607         if (!oinfo->oi_oa) {
608                 CDEBUG(D_INFO, "oa NULL\n");
609                 RETURN(-EINVAL);
610         }
611
612         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
613         if (req == NULL)
614                 RETURN(-ENOMEM);
615
616         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
617         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
618         if (rc) {
619                 ptlrpc_request_free(req);
620                 RETURN(rc);
621         }
622
623         /* overload the size and blocks fields in the oa with start/end */
624         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
625         LASSERT(body);
626         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
627         body->oa.o_size = start;
628         body->oa.o_blocks = end;
629         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
630         osc_pack_capa(req, body, oinfo->oi_capa);
631
632         ptlrpc_request_set_replen(req);
633         req->rq_interpret_reply = osc_sync_interpret;
634
635         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
636         aa = ptlrpc_req_async_args(req);
637         aa->aa_oi = oinfo;
638
639         ptlrpc_set_add_req(set, req);
640         RETURN (0);
641 }
642
643 /* Find and cancel locally locks matched by @mode in the resource found by
644  * @objid. Found locks are added into @cancel list. Returns the amount of
645  * locks added to @cancels list. */
646 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
647                                    cfs_list_t *cancels,
648                                    ldlm_mode_t mode, int lock_flags)
649 {
650         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
651         struct ldlm_res_id res_id;
652         struct ldlm_resource *res;
653         int count;
654         ENTRY;
655
656         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
657         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
658         if (res == NULL)
659                 RETURN(0);
660
661         LDLM_RESOURCE_ADDREF(res);
662         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
663                                            lock_flags, 0, NULL);
664         LDLM_RESOURCE_DELREF(res);
665         ldlm_resource_putref(res);
666         RETURN(count);
667 }
668
669 static int osc_destroy_interpret(const struct lu_env *env,
670                                  struct ptlrpc_request *req, void *data,
671                                  int rc)
672 {
673         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
674
675         cfs_atomic_dec(&cli->cl_destroy_in_flight);
676         cfs_waitq_signal(&cli->cl_destroy_waitq);
677         return 0;
678 }
679
680 static int osc_can_send_destroy(struct client_obd *cli)
681 {
682         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
683             cli->cl_max_rpcs_in_flight) {
684                 /* The destroy request can be sent */
685                 return 1;
686         }
687         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
688             cli->cl_max_rpcs_in_flight) {
689                 /*
690                  * The counter has been modified between the two atomic
691                  * operations.
692                  */
693                 cfs_waitq_signal(&cli->cl_destroy_waitq);
694         }
695         return 0;
696 }
697
698 /* Destroy requests can be async always on the client, and we don't even really
699  * care about the return code since the client cannot do anything at all about
700  * a destroy failure.
701  * When the MDS is unlinking a filename, it saves the file objects into a
702  * recovery llog, and these object records are cancelled when the OST reports
703  * they were destroyed and sync'd to disk (i.e. transaction committed).
704  * If the client dies, or the OST is down when the object should be destroyed,
705  * the records are not cancelled, and when the OST reconnects to the MDS next,
706  * it will retrieve the llog unlink logs and then sends the log cancellation
707  * cookies to the MDS after committing destroy transactions. */
708 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
709                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
710                        struct obd_export *md_export, void *capa)
711 {
712         struct client_obd     *cli = &exp->exp_obd->u.cli;
713         struct ptlrpc_request *req;
714         struct ost_body       *body;
715         CFS_LIST_HEAD(cancels);
716         int rc, count;
717         ENTRY;
718
719         if (!oa) {
720                 CDEBUG(D_INFO, "oa NULL\n");
721                 RETURN(-EINVAL);
722         }
723
724         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
725                                         LDLM_FL_DISCARD_DATA);
726
727         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
728         if (req == NULL) {
729                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
730                 RETURN(-ENOMEM);
731         }
732
733         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
734         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
735                                0, &cancels, count);
736         if (rc) {
737                 ptlrpc_request_free(req);
738                 RETURN(rc);
739         }
740
741         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
742         ptlrpc_at_set_req_timeout(req);
743
744         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
745                 oa->o_lcookie = *oti->oti_logcookies;
746         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
747         LASSERT(body);
748         lustre_set_wire_obdo(&body->oa, oa);
749
750         osc_pack_capa(req, body, (struct obd_capa *)capa);
751         ptlrpc_request_set_replen(req);
752
753         /* don't throttle destroy RPCs for the MDT */
754         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
755                 req->rq_interpret_reply = osc_destroy_interpret;
756                 if (!osc_can_send_destroy(cli)) {
757                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
758                                                           NULL);
759
760                         /*
761                          * Wait until the number of on-going destroy RPCs drops
762                          * under max_rpc_in_flight
763                          */
764                         l_wait_event_exclusive(cli->cl_destroy_waitq,
765                                                osc_can_send_destroy(cli), &lwi);
766                 }
767         }
768
769         /* Do not wait for response */
770         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
771         RETURN(0);
772 }
773
774 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
775                                 long writing_bytes)
776 {
777         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
778
779         LASSERT(!(oa->o_valid & bits));
780
781         oa->o_valid |= bits;
782         client_obd_list_lock(&cli->cl_loi_list_lock);
783         oa->o_dirty = cli->cl_dirty;
784         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
785                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
786                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
787                 oa->o_undirty = 0;
788         } else if (cfs_atomic_read(&obd_dirty_pages) -
789                    cfs_atomic_read(&obd_dirty_transit_pages) >
790                    obd_max_dirty_pages + 1){
791                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
792                  * not covered by a lock thus they may safely race and trip
793                  * this CERROR() unless we add in a small fudge factor (+1). */
794                 CERROR("dirty %d - %d > system dirty_max %d\n",
795                        cfs_atomic_read(&obd_dirty_pages),
796                        cfs_atomic_read(&obd_dirty_transit_pages),
797                        obd_max_dirty_pages);
798                 oa->o_undirty = 0;
799         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
800                 CERROR("dirty %lu - dirty_max %lu too big???\n",
801                        cli->cl_dirty, cli->cl_dirty_max);
802                 oa->o_undirty = 0;
803         } else {
804                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
805                                 (cli->cl_max_rpcs_in_flight + 1);
806                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
807         }
808         oa->o_grant = cli->cl_avail_grant;
809         oa->o_dropped = cli->cl_lost_grant;
810         cli->cl_lost_grant = 0;
811         client_obd_list_unlock(&cli->cl_loi_list_lock);
812         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
813                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
814
815 }
816
817 static void osc_update_next_shrink(struct client_obd *cli)
818 {
819         cli->cl_next_shrink_grant =
820                 cfs_time_shift(cli->cl_grant_shrink_interval);
821         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
822                cli->cl_next_shrink_grant);
823 }
824
825 /* caller must hold loi_list_lock */
826 static void osc_consume_write_grant(struct client_obd *cli,
827                                     struct brw_page *pga)
828 {
829         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
830         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
831         cfs_atomic_inc(&obd_dirty_pages);
832         cli->cl_dirty += CFS_PAGE_SIZE;
833         cli->cl_avail_grant -= CFS_PAGE_SIZE;
834         pga->flag |= OBD_BRW_FROM_GRANT;
835         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
836                CFS_PAGE_SIZE, pga, pga->pg);
837         LASSERT(cli->cl_avail_grant >= 0);
838         osc_update_next_shrink(cli);
839 }
840
841 /* the companion to osc_consume_write_grant, called when a brw has completed.
842  * must be called with the loi lock held. */
843 static void osc_release_write_grant(struct client_obd *cli,
844                                     struct brw_page *pga, int sent)
845 {
846         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
847         ENTRY;
848
849         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
850         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
851                 EXIT;
852                 return;
853         }
854
855         pga->flag &= ~OBD_BRW_FROM_GRANT;
856         cfs_atomic_dec(&obd_dirty_pages);
857         cli->cl_dirty -= CFS_PAGE_SIZE;
858         if (pga->flag & OBD_BRW_NOCACHE) {
859                 pga->flag &= ~OBD_BRW_NOCACHE;
860                 cfs_atomic_dec(&obd_dirty_transit_pages);
861                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
862         }
863         if (!sent) {
864                 cli->cl_lost_grant += CFS_PAGE_SIZE;
865                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
866                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
867         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
868                 /* For short writes we shouldn't count parts of pages that
869                  * span a whole block on the OST side, or our accounting goes
870                  * wrong.  Should match the code in filter_grant_check. */
871                 int offset = pga->off & ~CFS_PAGE_MASK;
872                 int count = pga->count + (offset & (blocksize - 1));
873                 int end = (offset + pga->count) & (blocksize - 1);
874                 if (end)
875                         count += blocksize - end;
876
877                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
878                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
879                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
880                        cli->cl_avail_grant, cli->cl_dirty);
881         }
882
883         EXIT;
884 }
885
886 static unsigned long rpcs_in_flight(struct client_obd *cli)
887 {
888         return cli->cl_r_in_flight + cli->cl_w_in_flight;
889 }
890
891 /* caller must hold loi_list_lock */
892 void osc_wake_cache_waiters(struct client_obd *cli)
893 {
894         cfs_list_t *l, *tmp;
895         struct osc_cache_waiter *ocw;
896
897         ENTRY;
898         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
899                 /* if we can't dirty more, we must wait until some is written */
900                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
901                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
902                     obd_max_dirty_pages)) {
903                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
904                                "osc max %ld, sys max %d\n", cli->cl_dirty,
905                                cli->cl_dirty_max, obd_max_dirty_pages);
906                         return;
907                 }
908
909                 /* if still dirty cache but no grant wait for pending RPCs that
910                  * may yet return us some grant before doing sync writes */
911                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
912                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
913                                cli->cl_w_in_flight);
914                         return;
915                 }
916
917                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
918                 cfs_list_del_init(&ocw->ocw_entry);
919                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
920                         /* no more RPCs in flight to return grant, do sync IO */
921                         ocw->ocw_rc = -EDQUOT;
922                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
923                 } else {
924                         osc_consume_write_grant(cli,
925                                                 &ocw->ocw_oap->oap_brw_page);
926                 }
927
928                 cfs_waitq_signal(&ocw->ocw_waitq);
929         }
930
931         EXIT;
932 }
933
934 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
935 {
936         client_obd_list_lock(&cli->cl_loi_list_lock);
937         cli->cl_avail_grant += grant;
938         client_obd_list_unlock(&cli->cl_loi_list_lock);
939 }
940
941 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
942 {
943         if (body->oa.o_valid & OBD_MD_FLGRANT) {
944                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
945                 __osc_update_grant(cli, body->oa.o_grant);
946         }
947 }
948
949 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
950                               void *key, obd_count vallen, void *val,
951                               struct ptlrpc_request_set *set);
952
953 static int osc_shrink_grant_interpret(const struct lu_env *env,
954                                       struct ptlrpc_request *req,
955                                       void *aa, int rc)
956 {
957         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
958         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
959         struct ost_body *body;
960
961         if (rc != 0) {
962                 __osc_update_grant(cli, oa->o_grant);
963                 GOTO(out, rc);
964         }
965
966         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
967         LASSERT(body);
968         osc_update_grant(cli, body);
969 out:
970         OBDO_FREE(oa);
971         return rc;
972 }
973
974 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
975 {
976         client_obd_list_lock(&cli->cl_loi_list_lock);
977         oa->o_grant = cli->cl_avail_grant / 4;
978         cli->cl_avail_grant -= oa->o_grant;
979         client_obd_list_unlock(&cli->cl_loi_list_lock);
980         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
981                 oa->o_valid |= OBD_MD_FLFLAGS;
982                 oa->o_flags = 0;
983         }
984         oa->o_flags |= OBD_FL_SHRINK_GRANT;
985         osc_update_next_shrink(cli);
986 }
987
988 /* Shrink the current grant, either from some large amount to enough for a
989  * full set of in-flight RPCs, or if we have already shrunk to that limit
990  * then to enough for a single RPC.  This avoids keeping more grant than
991  * needed, and avoids shrinking the grant piecemeal. */
992 static int osc_shrink_grant(struct client_obd *cli)
993 {
994         long target = (cli->cl_max_rpcs_in_flight + 1) *
995                       cli->cl_max_pages_per_rpc;
996
997         client_obd_list_lock(&cli->cl_loi_list_lock);
998         if (cli->cl_avail_grant <= target)
999                 target = cli->cl_max_pages_per_rpc;
1000         client_obd_list_unlock(&cli->cl_loi_list_lock);
1001
1002         return osc_shrink_grant_to_target(cli, target);
1003 }
1004
1005 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1006 {
1007         int    rc = 0;
1008         struct ost_body     *body;
1009         ENTRY;
1010
1011         client_obd_list_lock(&cli->cl_loi_list_lock);
1012         /* Don't shrink if we are already above or below the desired limit
1013          * We don't want to shrink below a single RPC, as that will negatively
1014          * impact block allocation and long-term performance. */
1015         if (target < cli->cl_max_pages_per_rpc)
1016                 target = cli->cl_max_pages_per_rpc;
1017
1018         if (target >= cli->cl_avail_grant) {
1019                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1020                 RETURN(0);
1021         }
1022         client_obd_list_unlock(&cli->cl_loi_list_lock);
1023
1024         OBD_ALLOC_PTR(body);
1025         if (!body)
1026                 RETURN(-ENOMEM);
1027
1028         osc_announce_cached(cli, &body->oa, 0);
1029
1030         client_obd_list_lock(&cli->cl_loi_list_lock);
1031         body->oa.o_grant = cli->cl_avail_grant - target;
1032         cli->cl_avail_grant = target;
1033         client_obd_list_unlock(&cli->cl_loi_list_lock);
1034         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1035                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1036                 body->oa.o_flags = 0;
1037         }
1038         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1039         osc_update_next_shrink(cli);
1040
1041         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1042                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1043                                 sizeof(*body), body, NULL);
1044         if (rc != 0)
1045                 __osc_update_grant(cli, body->oa.o_grant);
1046         OBD_FREE_PTR(body);
1047         RETURN(rc);
1048 }
1049
1050 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1051 static int osc_should_shrink_grant(struct client_obd *client)
1052 {
1053         cfs_time_t time = cfs_time_current();
1054         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1055
1056         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1057              OBD_CONNECT_GRANT_SHRINK) == 0)
1058                 return 0;
1059
1060         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1061                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1062                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1063                         return 1;
1064                 else
1065                         osc_update_next_shrink(client);
1066         }
1067         return 0;
1068 }
1069
1070 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1071 {
1072         struct client_obd *client;
1073
1074         cfs_list_for_each_entry(client, &item->ti_obd_list,
1075                                 cl_grant_shrink_list) {
1076                 if (osc_should_shrink_grant(client))
1077                         osc_shrink_grant(client);
1078         }
1079         return 0;
1080 }
1081
1082 static int osc_add_shrink_grant(struct client_obd *client)
1083 {
1084         int rc;
1085
1086         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1087                                        TIMEOUT_GRANT,
1088                                        osc_grant_shrink_grant_cb, NULL,
1089                                        &client->cl_grant_shrink_list);
1090         if (rc) {
1091                 CERROR("add grant client %s error %d\n",
1092                         client->cl_import->imp_obd->obd_name, rc);
1093                 return rc;
1094         }
1095         CDEBUG(D_CACHE, "add grant client %s \n",
1096                client->cl_import->imp_obd->obd_name);
1097         osc_update_next_shrink(client);
1098         return 0;
1099 }
1100
1101 static int osc_del_shrink_grant(struct client_obd *client)
1102 {
1103         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1104                                          TIMEOUT_GRANT);
1105 }
1106
1107 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1108 {
1109         /*
1110          * ocd_grant is the total grant amount we're expect to hold: if we've
1111          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1112          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1113          *
1114          * race is tolerable here: if we're evicted, but imp_state already
1115          * left EVICTED state, then cl_dirty must be 0 already.
1116          */
1117         client_obd_list_lock(&cli->cl_loi_list_lock);
1118         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1119                 cli->cl_avail_grant = ocd->ocd_grant;
1120         else
1121                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1122
1123         if (cli->cl_avail_grant < 0) {
1124                 CWARN("%s: available grant < 0, the OSS is probably not running"
1125                       " with patch from bug20278 (%ld) \n",
1126                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1127                 /* workaround for 1.6 servers which do not have
1128                  * the patch from bug20278 */
1129                 cli->cl_avail_grant = ocd->ocd_grant;
1130         }
1131
1132         client_obd_list_unlock(&cli->cl_loi_list_lock);
1133
1134         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1135                cli->cl_import->imp_obd->obd_name,
1136                cli->cl_avail_grant, cli->cl_lost_grant);
1137
1138         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1139             cfs_list_empty(&cli->cl_grant_shrink_list))
1140                 osc_add_shrink_grant(cli);
1141 }
1142
1143 /* We assume that the reason this OSC got a short read is because it read
1144  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1145  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1146  * this stripe never got written at or beyond this stripe offset yet. */
1147 static void handle_short_read(int nob_read, obd_count page_count,
1148                               struct brw_page **pga)
1149 {
1150         char *ptr;
1151         int i = 0;
1152
1153         /* skip bytes read OK */
1154         while (nob_read > 0) {
1155                 LASSERT (page_count > 0);
1156
1157                 if (pga[i]->count > nob_read) {
1158                         /* EOF inside this page */
1159                         ptr = cfs_kmap(pga[i]->pg) +
1160                                 (pga[i]->off & ~CFS_PAGE_MASK);
1161                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1162                         cfs_kunmap(pga[i]->pg);
1163                         page_count--;
1164                         i++;
1165                         break;
1166                 }
1167
1168                 nob_read -= pga[i]->count;
1169                 page_count--;
1170                 i++;
1171         }
1172
1173         /* zero remaining pages */
1174         while (page_count-- > 0) {
1175                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1176                 memset(ptr, 0, pga[i]->count);
1177                 cfs_kunmap(pga[i]->pg);
1178                 i++;
1179         }
1180 }
1181
1182 static int check_write_rcs(struct ptlrpc_request *req,
1183                            int requested_nob, int niocount,
1184                            obd_count page_count, struct brw_page **pga)
1185 {
1186         int     i;
1187         __u32   *remote_rcs;
1188
1189         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1190                                                   sizeof(*remote_rcs) *
1191                                                   niocount);
1192         if (remote_rcs == NULL) {
1193                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1194                 return(-EPROTO);
1195         }
1196
1197         /* return error if any niobuf was in error */
1198         for (i = 0; i < niocount; i++) {
1199                 if ((int)remote_rcs[i] < 0)
1200                         return(remote_rcs[i]);
1201
1202                 if (remote_rcs[i] != 0) {
1203                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1204                                 i, remote_rcs[i], req);
1205                         return(-EPROTO);
1206                 }
1207         }
1208
1209         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1210                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1211                        req->rq_bulk->bd_nob_transferred, requested_nob);
1212                 return(-EPROTO);
1213         }
1214
1215         return (0);
1216 }
1217
1218 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1219 {
1220         if (p1->flag != p2->flag) {
1221                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1222                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1223
1224                 /* warn if we try to combine flags that we don't know to be
1225                  * safe to combine */
1226                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1227                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1228                               "report this at http://bugs.whamcloud.com/\n",
1229                               p1->flag, p2->flag);
1230                 }
1231                 return 0;
1232         }
1233
1234         return (p1->off + p1->count == p2->off);
1235 }
1236
1237 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1238                                    struct brw_page **pga, int opc,
1239                                    cksum_type_t cksum_type)
1240 {
1241         __u32 cksum;
1242         int i = 0;
1243
1244         LASSERT (pg_count > 0);
1245         cksum = init_checksum(cksum_type);
1246         while (nob > 0 && pg_count > 0) {
1247                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1248                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1249                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1250
1251                 /* corrupt the data before we compute the checksum, to
1252                  * simulate an OST->client data error */
1253                 if (i == 0 && opc == OST_READ &&
1254                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1255                         memcpy(ptr + off, "bad1", min(4, nob));
1256                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1257                 cfs_kunmap(pga[i]->pg);
1258                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1259                                off, cksum);
1260
1261                 nob -= pga[i]->count;
1262                 pg_count--;
1263                 i++;
1264         }
1265         /* For sending we only compute the wrong checksum instead
1266          * of corrupting the data so it is still correct on a redo */
1267         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1268                 cksum++;
1269
1270         return fini_checksum(cksum, cksum_type);
1271 }
1272
1273 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1274                                 struct lov_stripe_md *lsm, obd_count page_count,
1275                                 struct brw_page **pga,
1276                                 struct ptlrpc_request **reqp,
1277                                 struct obd_capa *ocapa, int reserve,
1278                                 int resend)
1279 {
1280         struct ptlrpc_request   *req;
1281         struct ptlrpc_bulk_desc *desc;
1282         struct ost_body         *body;
1283         struct obd_ioobj        *ioobj;
1284         struct niobuf_remote    *niobuf;
1285         int niocount, i, requested_nob, opc, rc;
1286         struct osc_brw_async_args *aa;
1287         struct req_capsule      *pill;
1288         struct brw_page *pg_prev;
1289
1290         ENTRY;
1291         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1292                 RETURN(-ENOMEM); /* Recoverable */
1293         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1294                 RETURN(-EINVAL); /* Fatal */
1295
1296         if ((cmd & OBD_BRW_WRITE) != 0) {
1297                 opc = OST_WRITE;
1298                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1299                                                 cli->cl_import->imp_rq_pool,
1300                                                 &RQF_OST_BRW_WRITE);
1301         } else {
1302                 opc = OST_READ;
1303                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1304         }
1305         if (req == NULL)
1306                 RETURN(-ENOMEM);
1307
1308         for (niocount = i = 1; i < page_count; i++) {
1309                 if (!can_merge_pages(pga[i - 1], pga[i]))
1310                         niocount++;
1311         }
1312
1313         pill = &req->rq_pill;
1314         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1315                              sizeof(*ioobj));
1316         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1317                              niocount * sizeof(*niobuf));
1318         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1319
1320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1321         if (rc) {
1322                 ptlrpc_request_free(req);
1323                 RETURN(rc);
1324         }
1325         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1326         ptlrpc_at_set_req_timeout(req);
1327
1328         if (opc == OST_WRITE)
1329                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1330                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1331         else
1332                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1333                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1334
1335         if (desc == NULL)
1336                 GOTO(out, rc = -ENOMEM);
1337         /* NB request now owns desc and will free it when it gets freed */
1338
1339         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1340         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1341         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1342         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1343
1344         lustre_set_wire_obdo(&body->oa, oa);
1345
1346         obdo_to_ioobj(oa, ioobj);
1347         ioobj->ioo_bufcnt = niocount;
1348         osc_pack_capa(req, body, ocapa);
1349         LASSERT (page_count > 0);
1350         pg_prev = pga[0];
1351         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1352                 struct brw_page *pg = pga[i];
1353                 int poff = pg->off & ~CFS_PAGE_MASK;
1354
1355                 LASSERT(pg->count > 0);
1356                 /* make sure there is no gap in the middle of page array */
1357                 LASSERTF(page_count == 1 ||
1358                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1359                           ergo(i > 0 && i < page_count - 1,
1360                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1361                           ergo(i == page_count - 1, poff == 0)),
1362                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1363                          i, page_count, pg, pg->off, pg->count);
1364 #ifdef __linux__
1365                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1366                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1367                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1368                          i, page_count,
1369                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1370                          pg_prev->pg, page_private(pg_prev->pg),
1371                          pg_prev->pg->index, pg_prev->off);
1372 #else
1373                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1374                          "i %d p_c %u\n", i, page_count);
1375 #endif
1376                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1377                         (pg->flag & OBD_BRW_SRVLOCK));
1378
1379                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1380                 requested_nob += pg->count;
1381
1382                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1383                         niobuf--;
1384                         niobuf->len += pg->count;
1385                 } else {
1386                         niobuf->offset = pg->off;
1387                         niobuf->len    = pg->count;
1388                         niobuf->flags  = pg->flag;
1389                 }
1390                 pg_prev = pg;
1391         }
1392
1393         LASSERTF((void *)(niobuf - niocount) ==
1394                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1395                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1396                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1397
1398         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1399         if (resend) {
1400                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1401                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1402                         body->oa.o_flags = 0;
1403                 }
1404                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1405         }
1406
1407         if (osc_should_shrink_grant(cli))
1408                 osc_shrink_grant_local(cli, &body->oa);
1409
1410         /* size[REQ_REC_OFF] still sizeof (*body) */
1411         if (opc == OST_WRITE) {
1412                 if (unlikely(cli->cl_checksum) &&
1413                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1414                         /* store cl_cksum_type in a local variable since
1415                          * it can be changed via lprocfs */
1416                         cksum_type_t cksum_type = cli->cl_cksum_type;
1417
1418                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1419                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1420                                 body->oa.o_flags = 0;
1421                         }
1422                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1423                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1424                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1425                                                              page_count, pga,
1426                                                              OST_WRITE,
1427                                                              cksum_type);
1428                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1429                                body->oa.o_cksum);
1430                         /* save this in 'oa', too, for later checking */
1431                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1432                         oa->o_flags |= cksum_type_pack(cksum_type);
1433                 } else {
1434                         /* clear out the checksum flag, in case this is a
1435                          * resend but cl_checksum is no longer set. b=11238 */
1436                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1437                 }
1438                 oa->o_cksum = body->oa.o_cksum;
1439                 /* 1 RC per niobuf */
1440                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1441                                      sizeof(__u32) * niocount);
1442         } else {
1443                 if (unlikely(cli->cl_checksum) &&
1444                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1445                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1446                                 body->oa.o_flags = 0;
1447                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1448                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1449                 }
1450         }
1451         ptlrpc_request_set_replen(req);
1452
1453         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1454         aa = ptlrpc_req_async_args(req);
1455         aa->aa_oa = oa;
1456         aa->aa_requested_nob = requested_nob;
1457         aa->aa_nio_count = niocount;
1458         aa->aa_page_count = page_count;
1459         aa->aa_resends = 0;
1460         aa->aa_ppga = pga;
1461         aa->aa_cli = cli;
1462         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1463         if (ocapa && reserve)
1464                 aa->aa_ocapa = capa_get(ocapa);
1465
1466         *reqp = req;
1467         RETURN(0);
1468
1469  out:
1470         ptlrpc_req_finished(req);
1471         RETURN(rc);
1472 }
1473
1474 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1475                                 __u32 client_cksum, __u32 server_cksum, int nob,
1476                                 obd_count page_count, struct brw_page **pga,
1477                                 cksum_type_t client_cksum_type)
1478 {
1479         __u32 new_cksum;
1480         char *msg;
1481         cksum_type_t cksum_type;
1482
1483         if (server_cksum == client_cksum) {
1484                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1485                 return 0;
1486         }
1487
1488         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1489                                        oa->o_flags : 0);
1490         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1491                                       cksum_type);
1492
1493         if (cksum_type != client_cksum_type)
1494                 msg = "the server did not use the checksum type specified in "
1495                       "the original request - likely a protocol problem";
1496         else if (new_cksum == server_cksum)
1497                 msg = "changed on the client after we checksummed it - "
1498                       "likely false positive due to mmap IO (bug 11742)";
1499         else if (new_cksum == client_cksum)
1500                 msg = "changed in transit before arrival at OST";
1501         else
1502                 msg = "changed in transit AND doesn't match the original - "
1503                       "likely false positive due to mmap IO (bug 11742)";
1504
1505         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1506                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1507                            msg, libcfs_nid2str(peer->nid),
1508                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1509                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1510                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1511                            oa->o_id,
1512                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1513                            pga[0]->off,
1514                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1515         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1516                "client csum now %x\n", client_cksum, client_cksum_type,
1517                server_cksum, cksum_type, new_cksum);
1518         return 1;
1519 }
1520
1521 /* Note rc enters this function as number of bytes transferred */
1522 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1523 {
1524         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1525         const lnet_process_id_t *peer =
1526                         &req->rq_import->imp_connection->c_peer;
1527         struct client_obd *cli = aa->aa_cli;
1528         struct ost_body *body;
1529         __u32 client_cksum = 0;
1530         ENTRY;
1531
1532         if (rc < 0 && rc != -EDQUOT) {
1533                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1534                 RETURN(rc);
1535         }
1536
1537         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1538         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1539         if (body == NULL) {
1540                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1541                 RETURN(-EPROTO);
1542         }
1543
1544         /* set/clear over quota flag for a uid/gid */
1545         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1546             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1547                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1548
1549                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1550                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1551                        body->oa.o_flags);
1552                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1553         }
1554
1555         osc_update_grant(cli, body);
1556
1557         if (rc < 0)
1558                 RETURN(rc);
1559
1560         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1561                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1562
1563         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1564                 if (rc > 0) {
1565                         CERROR("Unexpected +ve rc %d\n", rc);
1566                         RETURN(-EPROTO);
1567                 }
1568                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1569
1570                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1571                         RETURN(-EAGAIN);
1572
1573                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1574                     check_write_checksum(&body->oa, peer, client_cksum,
1575                                          body->oa.o_cksum, aa->aa_requested_nob,
1576                                          aa->aa_page_count, aa->aa_ppga,
1577                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1578                         RETURN(-EAGAIN);
1579
1580                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1581                                      aa->aa_page_count, aa->aa_ppga);
1582                 GOTO(out, rc);
1583         }
1584
1585         /* The rest of this function executes only for OST_READs */
1586
1587         /* if unwrap_bulk failed, return -EAGAIN to retry */
1588         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1589         if (rc < 0)
1590                 GOTO(out, rc = -EAGAIN);
1591
1592         if (rc > aa->aa_requested_nob) {
1593                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1594                        aa->aa_requested_nob);
1595                 RETURN(-EPROTO);
1596         }
1597
1598         if (rc != req->rq_bulk->bd_nob_transferred) {
1599                 CERROR ("Unexpected rc %d (%d transferred)\n",
1600                         rc, req->rq_bulk->bd_nob_transferred);
1601                 return (-EPROTO);
1602         }
1603
1604         if (rc < aa->aa_requested_nob)
1605                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1606
1607         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1608                 static int cksum_counter;
1609                 __u32      server_cksum = body->oa.o_cksum;
1610                 char      *via;
1611                 char      *router;
1612                 cksum_type_t cksum_type;
1613
1614                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1615                                                body->oa.o_flags : 0);
1616                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1617                                                  aa->aa_ppga, OST_READ,
1618                                                  cksum_type);
1619
1620                 if (peer->nid == req->rq_bulk->bd_sender) {
1621                         via = router = "";
1622                 } else {
1623                         via = " via ";
1624                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1625                 }
1626
1627                 if (server_cksum == ~0 && rc > 0) {
1628                         CERROR("Protocol error: server %s set the 'checksum' "
1629                                "bit, but didn't send a checksum.  Not fatal, "
1630                                "but please notify on http://bugs.whamcloud.com/\n",
1631                                libcfs_nid2str(peer->nid));
1632                 } else if (server_cksum != client_cksum) {
1633                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1634                                            "%s%s%s inode "DFID" object "
1635                                            LPU64"/"LPU64" extent "
1636                                            "["LPU64"-"LPU64"]\n",
1637                                            req->rq_import->imp_obd->obd_name,
1638                                            libcfs_nid2str(peer->nid),
1639                                            via, router,
1640                                            body->oa.o_valid & OBD_MD_FLFID ?
1641                                                 body->oa.o_parent_seq : (__u64)0,
1642                                            body->oa.o_valid & OBD_MD_FLFID ?
1643                                                 body->oa.o_parent_oid : 0,
1644                                            body->oa.o_valid & OBD_MD_FLFID ?
1645                                                 body->oa.o_parent_ver : 0,
1646                                            body->oa.o_id,
1647                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1648                                                 body->oa.o_seq : (__u64)0,
1649                                            aa->aa_ppga[0]->off,
1650                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1651                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1652                                                                         1);
1653                         CERROR("client %x, server %x, cksum_type %x\n",
1654                                client_cksum, server_cksum, cksum_type);
1655                         cksum_counter = 0;
1656                         aa->aa_oa->o_cksum = client_cksum;
1657                         rc = -EAGAIN;
1658                 } else {
1659                         cksum_counter++;
1660                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1661                         rc = 0;
1662                 }
1663         } else if (unlikely(client_cksum)) {
1664                 static int cksum_missed;
1665
1666                 cksum_missed++;
1667                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1668                         CERROR("Checksum %u requested from %s but not sent\n",
1669                                cksum_missed, libcfs_nid2str(peer->nid));
1670         } else {
1671                 rc = 0;
1672         }
1673 out:
1674         if (rc >= 0)
1675                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1676
1677         RETURN(rc);
1678 }
1679
1680 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1681                             struct lov_stripe_md *lsm,
1682                             obd_count page_count, struct brw_page **pga,
1683                             struct obd_capa *ocapa)
1684 {
1685         struct ptlrpc_request *req;
1686         int                    rc;
1687         cfs_waitq_t            waitq;
1688         int                    resends = 0;
1689         struct l_wait_info     lwi;
1690
1691         ENTRY;
1692
1693         cfs_waitq_init(&waitq);
1694
1695 restart_bulk:
1696         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1697                                   page_count, pga, &req, ocapa, 0, resends);
1698         if (rc != 0)
1699                 return (rc);
1700
1701         rc = ptlrpc_queue_wait(req);
1702
1703         if (rc == -ETIMEDOUT && req->rq_resend) {
1704                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1705                 ptlrpc_req_finished(req);
1706                 goto restart_bulk;
1707         }
1708
1709         rc = osc_brw_fini_request(req, rc);
1710
1711         ptlrpc_req_finished(req);
1712         if (osc_recoverable_error(rc)) {
1713                 resends++;
1714                 if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1715                         CERROR("too many resend retries, returning error\n");
1716                         RETURN(-EIO);
1717                 }
1718
1719                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1720                 l_wait_event(waitq, 0, &lwi);
1721
1722                 goto restart_bulk;
1723         }
1724
1725         RETURN (rc);
1726 }
1727
1728 int osc_brw_redo_request(struct ptlrpc_request *request,
1729                          struct osc_brw_async_args *aa)
1730 {
1731         struct ptlrpc_request *new_req;
1732         struct ptlrpc_request_set *set = request->rq_set;
1733         struct osc_brw_async_args *new_aa;
1734         struct osc_async_page *oap;
1735         int rc = 0;
1736         ENTRY;
1737
1738         if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
1739                 CERROR("too many resent retries, returning error\n");
1740                 RETURN(-EIO);
1741         }
1742
1743         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1744
1745         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1746                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1747                                   aa->aa_cli, aa->aa_oa,
1748                                   NULL /* lsm unused by osc currently */,
1749                                   aa->aa_page_count, aa->aa_ppga,
1750                                   &new_req, aa->aa_ocapa, 0, 1);
1751         if (rc)
1752                 RETURN(rc);
1753
1754         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1755
1756         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1757                 if (oap->oap_request != NULL) {
1758                         LASSERTF(request == oap->oap_request,
1759                                  "request %p != oap_request %p\n",
1760                                  request, oap->oap_request);
1761                         if (oap->oap_interrupted) {
1762                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1763                                 ptlrpc_req_finished(new_req);
1764                                 RETURN(-EINTR);
1765                         }
1766                 }
1767         }
1768         /* New request takes over pga and oaps from old request.
1769          * Note that copying a list_head doesn't work, need to move it... */
1770         aa->aa_resends++;
1771         new_req->rq_interpret_reply = request->rq_interpret_reply;
1772         new_req->rq_async_args = request->rq_async_args;
1773         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1774
1775         new_aa = ptlrpc_req_async_args(new_req);
1776
1777         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1778         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1779         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1780
1781         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1782                 if (oap->oap_request) {
1783                         ptlrpc_req_finished(oap->oap_request);
1784                         oap->oap_request = ptlrpc_request_addref(new_req);
1785                 }
1786         }
1787
1788         new_aa->aa_ocapa = aa->aa_ocapa;
1789         aa->aa_ocapa = NULL;
1790
1791         /* use ptlrpc_set_add_req is safe because interpret functions work
1792          * in check_set context. only one way exist with access to request
1793          * from different thread got -EINTR - this way protected with
1794          * cl_loi_list_lock */
1795         ptlrpc_set_add_req(set, new_req);
1796
1797         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1798
1799         DEBUG_REQ(D_INFO, new_req, "new request");
1800         RETURN(0);
1801 }
1802
1803 /*
1804  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1805  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1806  * fine for our small page arrays and doesn't require allocation.  its an
1807  * insertion sort that swaps elements that are strides apart, shrinking the
1808  * stride down until its '1' and the array is sorted.
1809  */
1810 static void sort_brw_pages(struct brw_page **array, int num)
1811 {
1812         int stride, i, j;
1813         struct brw_page *tmp;
1814
1815         if (num == 1)
1816                 return;
1817         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1818                 ;
1819
1820         do {
1821                 stride /= 3;
1822                 for (i = stride ; i < num ; i++) {
1823                         tmp = array[i];
1824                         j = i;
1825                         while (j >= stride && array[j - stride]->off > tmp->off) {
1826                                 array[j] = array[j - stride];
1827                                 j -= stride;
1828                         }
1829                         array[j] = tmp;
1830                 }
1831         } while (stride > 1);
1832 }
1833
1834 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1835 {
1836         int count = 1;
1837         int offset;
1838         int i = 0;
1839
1840         LASSERT (pages > 0);
1841         offset = pg[i]->off & ~CFS_PAGE_MASK;
1842
1843         for (;;) {
1844                 pages--;
1845                 if (pages == 0)         /* that's all */
1846                         return count;
1847
1848                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1849                         return count;   /* doesn't end on page boundary */
1850
1851                 i++;
1852                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1853                 if (offset != 0)        /* doesn't start on page boundary */
1854                         return count;
1855
1856                 count++;
1857         }
1858 }
1859
1860 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1861 {
1862         struct brw_page **ppga;
1863         int i;
1864
1865         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1866         if (ppga == NULL)
1867                 return NULL;
1868
1869         for (i = 0; i < count; i++)
1870                 ppga[i] = pga + i;
1871         return ppga;
1872 }
1873
1874 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1875 {
1876         LASSERT(ppga != NULL);
1877         OBD_FREE(ppga, sizeof(*ppga) * count);
1878 }
1879
1880 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1881                    obd_count page_count, struct brw_page *pga,
1882                    struct obd_trans_info *oti)
1883 {
1884         struct obdo *saved_oa = NULL;
1885         struct brw_page **ppga, **orig;
1886         struct obd_import *imp = class_exp2cliimp(exp);
1887         struct client_obd *cli;
1888         int rc, page_count_orig;
1889         ENTRY;
1890
1891         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1892         cli = &imp->imp_obd->u.cli;
1893
1894         if (cmd & OBD_BRW_CHECK) {
1895                 /* The caller just wants to know if there's a chance that this
1896                  * I/O can succeed */
1897
1898                 if (imp->imp_invalid)
1899                         RETURN(-EIO);
1900                 RETURN(0);
1901         }
1902
1903         /* test_brw with a failed create can trip this, maybe others. */
1904         LASSERT(cli->cl_max_pages_per_rpc);
1905
1906         rc = 0;
1907
1908         orig = ppga = osc_build_ppga(pga, page_count);
1909         if (ppga == NULL)
1910                 RETURN(-ENOMEM);
1911         page_count_orig = page_count;
1912
1913         sort_brw_pages(ppga, page_count);
1914         while (page_count) {
1915                 obd_count pages_per_brw;
1916
1917                 if (page_count > cli->cl_max_pages_per_rpc)
1918                         pages_per_brw = cli->cl_max_pages_per_rpc;
1919                 else
1920                         pages_per_brw = page_count;
1921
1922                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1923
1924                 if (saved_oa != NULL) {
1925                         /* restore previously saved oa */
1926                         *oinfo->oi_oa = *saved_oa;
1927                 } else if (page_count > pages_per_brw) {
1928                         /* save a copy of oa (brw will clobber it) */
1929                         OBDO_ALLOC(saved_oa);
1930                         if (saved_oa == NULL)
1931                                 GOTO(out, rc = -ENOMEM);
1932                         *saved_oa = *oinfo->oi_oa;
1933                 }
1934
1935                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1936                                       pages_per_brw, ppga, oinfo->oi_capa);
1937
1938                 if (rc != 0)
1939                         break;
1940
1941                 page_count -= pages_per_brw;
1942                 ppga += pages_per_brw;
1943         }
1944
1945 out:
1946         osc_release_ppga(orig, page_count_orig);
1947
1948         if (saved_oa != NULL)
1949                 OBDO_FREE(saved_oa);
1950
1951         RETURN(rc);
1952 }
1953
1954 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1955  * the dirty accounting.  Writeback completes or truncate happens before
1956  * writing starts.  Must be called with the loi lock held. */
1957 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1958                            int sent)
1959 {
1960         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1961 }
1962
1963
1964 /* This maintains the lists of pending pages to read/write for a given object
1965  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1966  * to quickly find objects that are ready to send an RPC. */
1967 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1968                          int cmd)
1969 {
1970         ENTRY;
1971
1972         if (lop->lop_num_pending == 0)
1973                 RETURN(0);
1974
1975         /* if we have an invalid import we want to drain the queued pages
1976          * by forcing them through rpcs that immediately fail and complete
1977          * the pages.  recovery relies on this to empty the queued pages
1978          * before canceling the locks and evicting down the llite pages */
1979         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1980                 RETURN(1);
1981
1982         /* stream rpcs in queue order as long as as there is an urgent page
1983          * queued.  this is our cheap solution for good batching in the case
1984          * where writepage marks some random page in the middle of the file
1985          * as urgent because of, say, memory pressure */
1986         if (!cfs_list_empty(&lop->lop_urgent)) {
1987                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1988                 RETURN(1);
1989         }
1990
1991         if (cmd & OBD_BRW_WRITE) {
1992                 /* trigger a write rpc stream as long as there are dirtiers
1993                  * waiting for space.  as they're waiting, they're not going to
1994                  * create more pages to coalesce with what's waiting.. */
1995                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1996                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1997                         RETURN(1);
1998                 }
1999         }
2000         if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
2001                 RETURN(1);
2002
2003         RETURN(0);
2004 }
2005
2006 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2007 {
2008         struct osc_async_page *oap;
2009         ENTRY;
2010
2011         if (cfs_list_empty(&lop->lop_urgent))
2012                 RETURN(0);
2013
2014         oap = cfs_list_entry(lop->lop_urgent.next,
2015                          struct osc_async_page, oap_urgent_item);
2016
2017         if (oap->oap_async_flags & ASYNC_HP) {
2018                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2019                 RETURN(1);
2020         }
2021
2022         RETURN(0);
2023 }
2024
2025 static void on_list(cfs_list_t *item, cfs_list_t *list,
2026                     int should_be_on)
2027 {
2028         if (cfs_list_empty(item) && should_be_on)
2029                 cfs_list_add_tail(item, list);
2030         else if (!cfs_list_empty(item) && !should_be_on)
2031                 cfs_list_del_init(item);
2032 }
2033
2034 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2035  * can find pages to build into rpcs quickly */
2036 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2037 {
2038         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2039             lop_makes_hprpc(&loi->loi_read_lop)) {
2040                 /* HP rpc */
2041                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2042                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2043         } else {
2044                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2045                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2046                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2047                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2048         }
2049
2050         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2051                 loi->loi_write_lop.lop_num_pending);
2052
2053         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2054                 loi->loi_read_lop.lop_num_pending);
2055 }
2056
2057 static void lop_update_pending(struct client_obd *cli,
2058                                struct loi_oap_pages *lop, int cmd, int delta)
2059 {
2060         lop->lop_num_pending += delta;
2061         if (cmd & OBD_BRW_WRITE)
2062                 cli->cl_pending_w_pages += delta;
2063         else
2064                 cli->cl_pending_r_pages += delta;
2065 }
2066
2067 /**
2068  * this is called when a sync waiter receives an interruption.  Its job is to
2069  * get the caller woken as soon as possible.  If its page hasn't been put in an
2070  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2071  * desiring interruption which will forcefully complete the rpc once the rpc
2072  * has timed out.
2073  */
2074 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2075 {
2076         struct loi_oap_pages *lop;
2077         struct lov_oinfo *loi;
2078         int rc = -EBUSY;
2079         ENTRY;
2080
2081         LASSERT(!oap->oap_interrupted);
2082         oap->oap_interrupted = 1;
2083
2084         /* ok, it's been put in an rpc. only one oap gets a request reference */
2085         if (oap->oap_request != NULL) {
2086                 ptlrpc_mark_interrupted(oap->oap_request);
2087                 ptlrpcd_wake(oap->oap_request);
2088                 ptlrpc_req_finished(oap->oap_request);
2089                 oap->oap_request = NULL;
2090         }
2091
2092         /*
2093          * page completion may be called only if ->cpo_prep() method was
2094          * executed by osc_io_submit(), that also adds page the to pending list
2095          */
2096         if (!cfs_list_empty(&oap->oap_pending_item)) {
2097                 cfs_list_del_init(&oap->oap_pending_item);
2098                 cfs_list_del_init(&oap->oap_urgent_item);
2099
2100                 loi = oap->oap_loi;
2101                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2102                         &loi->loi_write_lop : &loi->loi_read_lop;
2103                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2104                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2105                 rc = oap->oap_caller_ops->ap_completion(env,
2106                                           oap->oap_caller_data,
2107                                           oap->oap_cmd, NULL, -EINTR);
2108         }
2109
2110         RETURN(rc);
2111 }
2112
2113 /* this is trying to propogate async writeback errors back up to the
2114  * application.  As an async write fails we record the error code for later if
2115  * the app does an fsync.  As long as errors persist we force future rpcs to be
2116  * sync so that the app can get a sync error and break the cycle of queueing
2117  * pages for which writeback will fail. */
2118 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2119                            int rc)
2120 {
2121         if (rc) {
2122                 if (!ar->ar_rc)
2123                         ar->ar_rc = rc;
2124
2125                 ar->ar_force_sync = 1;
2126                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2127                 return;
2128
2129         }
2130
2131         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2132                 ar->ar_force_sync = 0;
2133 }
2134
2135 void osc_oap_to_pending(struct osc_async_page *oap)
2136 {
2137         struct loi_oap_pages *lop;
2138
2139         if (oap->oap_cmd & OBD_BRW_WRITE)
2140                 lop = &oap->oap_loi->loi_write_lop;
2141         else
2142                 lop = &oap->oap_loi->loi_read_lop;
2143
2144         if (oap->oap_async_flags & ASYNC_HP)
2145                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2146         else if (oap->oap_async_flags & ASYNC_URGENT)
2147                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2148         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2149         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2150 }
2151
2152 /* this must be called holding the loi list lock to give coverage to exit_cache,
2153  * async_flag maintenance, and oap_request */
2154 static void osc_ap_completion(const struct lu_env *env,
2155                               struct client_obd *cli, struct obdo *oa,
2156                               struct osc_async_page *oap, int sent, int rc)
2157 {
2158         __u64 xid = 0;
2159
2160         ENTRY;
2161         if (oap->oap_request != NULL) {
2162                 xid = ptlrpc_req_xid(oap->oap_request);
2163                 ptlrpc_req_finished(oap->oap_request);
2164                 oap->oap_request = NULL;
2165         }
2166
2167         cfs_spin_lock(&oap->oap_lock);
2168         oap->oap_async_flags = 0;
2169         cfs_spin_unlock(&oap->oap_lock);
2170         oap->oap_interrupted = 0;
2171
2172         if (oap->oap_cmd & OBD_BRW_WRITE) {
2173                 osc_process_ar(&cli->cl_ar, xid, rc);
2174                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2175         }
2176
2177         if (rc == 0 && oa != NULL) {
2178                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2179                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2180                 if (oa->o_valid & OBD_MD_FLMTIME)
2181                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2182                 if (oa->o_valid & OBD_MD_FLATIME)
2183                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2184                 if (oa->o_valid & OBD_MD_FLCTIME)
2185                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2186         }
2187
2188         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2189                                                 oap->oap_cmd, oa, rc);
2190
2191         /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
2192          * start, but OSC calls it under lock and thus we can add oap back to
2193          * pending safely */
2194         if (rc)
2195                 /* upper layer wants to leave the page on pending queue */
2196                 osc_oap_to_pending(oap);
2197         else
2198                 osc_exit_cache(cli, oap, sent);
2199         EXIT;
2200 }
2201
2202 static int brw_queue_work(const struct lu_env *env, void *data)
2203 {
2204         struct client_obd *cli = data;
2205
2206         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2207
2208         client_obd_list_lock(&cli->cl_loi_list_lock);
2209         osc_check_rpcs0(env, cli, 1);
2210         client_obd_list_unlock(&cli->cl_loi_list_lock);
2211         RETURN(0);
2212 }
2213
2214 static int brw_interpret(const struct lu_env *env,
2215                          struct ptlrpc_request *req, void *data, int rc)
2216 {
2217         struct osc_brw_async_args *aa = data;
2218         struct client_obd *cli;
2219         int async;
2220         ENTRY;
2221
2222         rc = osc_brw_fini_request(req, rc);
2223         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2224         if (osc_recoverable_error(rc)) {
2225                 rc = osc_brw_redo_request(req, aa);
2226                 if (rc == 0)
2227                         RETURN(0);
2228         }
2229
2230         if (aa->aa_ocapa) {
2231                 capa_put(aa->aa_ocapa);
2232                 aa->aa_ocapa = NULL;
2233         }
2234
2235         cli = aa->aa_cli;
2236         client_obd_list_lock(&cli->cl_loi_list_lock);
2237
2238         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2239          * is called so we know whether to go to sync BRWs or wait for more
2240          * RPCs to complete */
2241         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2242                 cli->cl_w_in_flight--;
2243         else
2244                 cli->cl_r_in_flight--;
2245
2246         async = cfs_list_empty(&aa->aa_oaps);
2247         if (!async) { /* from osc_send_oap_rpc() */
2248                 struct osc_async_page *oap, *tmp;
2249                 /* the caller may re-use the oap after the completion call so
2250                  * we need to clean it up a little */
2251                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2252                                              oap_rpc_item) {
2253                         cfs_list_del_init(&oap->oap_rpc_item);
2254                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2255                 }
2256                 OBDO_FREE(aa->aa_oa);
2257         } else { /* from async_internal() */
2258                 obd_count i;
2259                 for (i = 0; i < aa->aa_page_count; i++)
2260                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2261         }
2262         osc_wake_cache_waiters(cli);
2263         osc_check_rpcs0(env, cli, 1);
2264         client_obd_list_unlock(&cli->cl_loi_list_lock);
2265
2266         if (!async)
2267                 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2268                                   req->rq_bulk->bd_nob_transferred);
2269         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2270         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2271
2272         RETURN(rc);
2273 }
2274
2275 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2276                                             struct client_obd *cli,
2277                                             cfs_list_t *rpc_list,
2278                                             int page_count, int cmd)
2279 {
2280         struct ptlrpc_request *req;
2281         struct brw_page **pga = NULL;
2282         struct osc_brw_async_args *aa;
2283         struct obdo *oa = NULL;
2284         const struct obd_async_page_ops *ops = NULL;
2285         struct osc_async_page *oap;
2286         struct osc_async_page *tmp;
2287         struct cl_req *clerq = NULL;
2288         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2289         struct ldlm_lock *lock = NULL;
2290         struct cl_req_attr crattr;
2291         int i, rc, mpflag = 0;
2292
2293         ENTRY;
2294         LASSERT(!cfs_list_empty(rpc_list));
2295
2296         if (cmd & OBD_BRW_MEMALLOC)
2297                 mpflag = cfs_memory_pressure_get_and_set();
2298
2299         memset(&crattr, 0, sizeof crattr);
2300         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2301         if (pga == NULL)
2302                 GOTO(out, req = ERR_PTR(-ENOMEM));
2303
2304         OBDO_ALLOC(oa);
2305         if (oa == NULL)
2306                 GOTO(out, req = ERR_PTR(-ENOMEM));
2307
2308         i = 0;
2309         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2310                 struct cl_page *page = osc_oap2cl_page(oap);
2311                 if (ops == NULL) {
2312                         ops = oap->oap_caller_ops;
2313
2314                         clerq = cl_req_alloc(env, page, crt,
2315                                              1 /* only 1-object rpcs for
2316                                                 * now */);
2317                         if (IS_ERR(clerq))
2318                                 GOTO(out, req = (void *)clerq);
2319                         lock = oap->oap_ldlm_lock;
2320                 }
2321                 pga[i] = &oap->oap_brw_page;
2322                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2323                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2324                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2325                 i++;
2326                 cl_req_page_add(env, clerq, page);
2327         }
2328
2329         /* always get the data for the obdo for the rpc */
2330         LASSERT(ops != NULL);
2331         crattr.cra_oa = oa;
2332         crattr.cra_capa = NULL;
2333         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2334         if (lock) {
2335                 oa->o_handle = lock->l_remote_handle;
2336                 oa->o_valid |= OBD_MD_FLHANDLE;
2337         }
2338
2339         rc = cl_req_prep(env, clerq);
2340         if (rc != 0) {
2341                 CERROR("cl_req_prep failed: %d\n", rc);
2342                 GOTO(out, req = ERR_PTR(rc));
2343         }
2344
2345         sort_brw_pages(pga, page_count);
2346         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2347                                   pga, &req, crattr.cra_capa, 1, 0);
2348         if (rc != 0) {
2349                 CERROR("prep_req failed: %d\n", rc);
2350                 GOTO(out, req = ERR_PTR(rc));
2351         }
2352
2353         if (cmd & OBD_BRW_MEMALLOC)
2354                 req->rq_memalloc = 1;
2355
2356         /* Need to update the timestamps after the request is built in case
2357          * we race with setattr (locally or in queue at OST).  If OST gets
2358          * later setattr before earlier BRW (as determined by the request xid),
2359          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2360          * way to do this in a single call.  bug 10150 */
2361         cl_req_attr_set(env, clerq, &crattr,
2362                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2363
2364         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2365         aa = ptlrpc_req_async_args(req);
2366         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2367         cfs_list_splice(rpc_list, &aa->aa_oaps);
2368         CFS_INIT_LIST_HEAD(rpc_list);
2369         aa->aa_clerq = clerq;
2370 out:
2371         if (cmd & OBD_BRW_MEMALLOC)
2372                 cfs_memory_pressure_restore(mpflag);
2373
2374         capa_put(crattr.cra_capa);
2375         if (IS_ERR(req)) {
2376                 if (oa)
2377                         OBDO_FREE(oa);
2378                 if (pga)
2379                         OBD_FREE(pga, sizeof(*pga) * page_count);
2380                 /* this should happen rarely and is pretty bad, it makes the
2381                  * pending list not follow the dirty order */
2382                 client_obd_list_lock(&cli->cl_loi_list_lock);
2383                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2384                         cfs_list_del_init(&oap->oap_rpc_item);
2385
2386                         /* queued sync pages can be torn down while the pages
2387                          * were between the pending list and the rpc */
2388                         if (oap->oap_interrupted) {
2389                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2390                                 osc_ap_completion(env, cli, NULL, oap, 0,
2391                                                   oap->oap_count);
2392                                 continue;
2393                         }
2394                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2395                 }
2396                 if (clerq && !IS_ERR(clerq))
2397                         cl_req_completion(env, clerq, PTR_ERR(req));
2398         }
2399         RETURN(req);
2400 }
2401
2402 /**
2403  * prepare pages for ASYNC io and put pages in send queue.
2404  *
2405  * \param cmd OBD_BRW_* macroses
2406  * \param lop pending pages
2407  *
2408  * \return zero if no page added to send queue.
2409  * \return 1 if pages successfully added to send queue.
2410  * \return negative on errors.
2411  */
2412 static int
2413 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2414                  struct lov_oinfo *loi, int cmd,
2415                  struct loi_oap_pages *lop, pdl_policy_t pol)
2416 {
2417         struct ptlrpc_request *req;
2418         obd_count page_count = 0;
2419         struct osc_async_page *oap = NULL, *tmp;
2420         struct osc_brw_async_args *aa;
2421         const struct obd_async_page_ops *ops;
2422         CFS_LIST_HEAD(rpc_list);
2423         int srvlock = 0, mem_tight = 0;
2424         struct cl_object *clob = NULL;
2425         obd_off starting_offset = OBD_OBJECT_EOF;
2426         unsigned int ending_offset;
2427         int starting_page_off = 0;
2428         ENTRY;
2429
2430         /* ASYNC_HP pages first. At present, when the lock the pages is
2431          * to be canceled, the pages covered by the lock will be sent out
2432          * with ASYNC_HP. We have to send out them as soon as possible. */
2433         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2434                 if (oap->oap_async_flags & ASYNC_HP)
2435                         cfs_list_move(&oap->oap_pending_item, &rpc_list);
2436                 else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
2437                         /* only do this for writeback pages. */
2438                         cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
2439                 if (++page_count >= cli->cl_max_pages_per_rpc)
2440                         break;
2441         }
2442         cfs_list_splice_init(&rpc_list, &lop->lop_pending);
2443         page_count = 0;
2444
2445         /* first we find the pages we're allowed to work with */
2446         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2447                                      oap_pending_item) {
2448                 ops = oap->oap_caller_ops;
2449
2450                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2451                          "magic 0x%x\n", oap, oap->oap_magic);
2452
2453                 if (clob == NULL) {
2454                         /* pin object in memory, so that completion call-backs
2455                          * can be safely called under client_obd_list lock. */
2456                         clob = osc_oap2cl_page(oap)->cp_obj;
2457                         cl_object_get(clob);
2458                 }
2459
2460                 if (page_count != 0 &&
2461                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2462                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2463                                " oap %p, page %p, srvlock %u\n",
2464                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2465                         break;
2466                 }
2467
2468                 /* If there is a gap at the start of this page, it can't merge
2469                  * with any previous page, so we'll hand the network a
2470                  * "fragmented" page array that it can't transfer in 1 RDMA */
2471                 if (oap->oap_obj_off < starting_offset) {
2472                         if (starting_page_off != 0)
2473                                 break;
2474
2475                         starting_page_off = oap->oap_page_off;
2476                         starting_offset = oap->oap_obj_off + starting_page_off;
2477                 } else if (oap->oap_page_off != 0)
2478                         break;
2479
2480                 /* in llite being 'ready' equates to the page being locked
2481                  * until completion unlocks it.  commit_write submits a page
2482                  * as not ready because its unlock will happen unconditionally
2483                  * as the call returns.  if we race with commit_write giving
2484                  * us that page we don't want to create a hole in the page
2485                  * stream, so we stop and leave the rpc to be fired by
2486                  * another dirtier or kupdated interval (the not ready page
2487                  * will still be on the dirty list).  we could call in
2488                  * at the end of ll_file_write to process the queue again. */
2489                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2490                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2491                                                     cmd);
2492                         if (rc < 0)
2493                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2494                                                 "instead of ready\n", oap,
2495                                                 oap->oap_page, rc);
2496                         switch (rc) {
2497                         case -EAGAIN:
2498                                 /* llite is telling us that the page is still
2499                                  * in commit_write and that we should try
2500                                  * and put it in an rpc again later.  we
2501                                  * break out of the loop so we don't create
2502                                  * a hole in the sequence of pages in the rpc
2503                                  * stream.*/
2504                                 oap = NULL;
2505                                 break;
2506                         case -EINTR:
2507                                 /* the io isn't needed.. tell the checks
2508                                  * below to complete the rpc with EINTR */
2509                                 cfs_spin_lock(&oap->oap_lock);
2510                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2511                                 cfs_spin_unlock(&oap->oap_lock);
2512                                 oap->oap_count = -EINTR;
2513                                 break;
2514                         case 0:
2515                                 cfs_spin_lock(&oap->oap_lock);
2516                                 oap->oap_async_flags |= ASYNC_READY;
2517                                 cfs_spin_unlock(&oap->oap_lock);
2518                                 break;
2519                         default:
2520                                 LASSERTF(0, "oap %p page %p returned %d "
2521                                             "from make_ready\n", oap,
2522                                             oap->oap_page, rc);
2523                                 break;
2524                         }
2525                 }
2526                 if (oap == NULL)
2527                         break;
2528
2529                 /* take the page out of our book-keeping */
2530                 cfs_list_del_init(&oap->oap_pending_item);
2531                 lop_update_pending(cli, lop, cmd, -1);
2532                 cfs_list_del_init(&oap->oap_urgent_item);
2533
2534                 /* ask the caller for the size of the io as the rpc leaves. */
2535                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2536                         oap->oap_count =
2537                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2538                                                       cmd);
2539                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2540                 }
2541                 if (oap->oap_count <= 0) {
2542                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2543                                oap->oap_count);
2544                         osc_ap_completion(env, cli, NULL,
2545                                           oap, 0, oap->oap_count);
2546                         continue;
2547                 }
2548
2549                 /* now put the page back in our accounting */
2550                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2551                 if (page_count++ == 0)
2552                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2553
2554                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2555                         mem_tight = 1;
2556
2557                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2558                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2559                  * have the same alignment as the initial writes that allocated
2560                  * extents on the server. */
2561                 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2562                                 oap->oap_count;
2563                 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2564                         break;
2565
2566                 if (page_count >= cli->cl_max_pages_per_rpc)
2567                         break;
2568
2569                 /* If there is a gap at the end of this page, it can't merge
2570                  * with any subsequent pages, so we'll hand the network a
2571                  * "fragmented" page array that it can't transfer in 1 RDMA */
2572                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2573                         break;
2574         }
2575
2576         osc_wake_cache_waiters(cli);
2577
2578         loi_list_maint(cli, loi);
2579
2580         client_obd_list_unlock(&cli->cl_loi_list_lock);
2581
2582         if (clob != NULL)
2583                 cl_object_put(env, clob);
2584
2585         if (page_count == 0) {
2586                 client_obd_list_lock(&cli->cl_loi_list_lock);
2587                 RETURN(0);
2588         }
2589
2590         req = osc_build_req(env, cli, &rpc_list, page_count,
2591                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2592         if (IS_ERR(req)) {
2593                 LASSERT(cfs_list_empty(&rpc_list));
2594                 loi_list_maint(cli, loi);
2595                 RETURN(PTR_ERR(req));
2596         }
2597
2598         aa = ptlrpc_req_async_args(req);
2599
2600         starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2601         if (cmd == OBD_BRW_READ) {
2602                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2603                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2604                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2605                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2606         } else {
2607                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2608                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2609                                  cli->cl_w_in_flight);
2610                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2611                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2612         }
2613
2614         client_obd_list_lock(&cli->cl_loi_list_lock);
2615
2616         if (cmd == OBD_BRW_READ)
2617                 cli->cl_r_in_flight++;
2618         else
2619                 cli->cl_w_in_flight++;
2620
2621         /* queued sync pages can be torn down while the pages
2622          * were between the pending list and the rpc */
2623         tmp = NULL;
2624         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2625                 /* only one oap gets a request reference */
2626                 if (tmp == NULL)
2627                         tmp = oap;
2628                 if (oap->oap_interrupted && !req->rq_intr) {
2629                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2630                                oap, req);
2631                         ptlrpc_mark_interrupted(req);
2632                 }
2633         }
2634         if (tmp != NULL)
2635                 tmp->oap_request = ptlrpc_request_addref(req);
2636
2637         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2638                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2639
2640         req->rq_interpret_reply = brw_interpret;
2641
2642         /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
2643          *      CPU/NUMA node the majority of pages were allocated on, and try
2644          *      to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
2645          *      to reduce cross-CPU memory traffic.
2646          *
2647          *      But on the other hand, we expect that multiple ptlrpcd threads
2648          *      and the initial write sponsor can run in parallel, especially
2649          *      when data checksum is enabled, which is CPU-bound operation and
2650          *      single ptlrpcd thread cannot process in time. So more ptlrpcd
2651          *      threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
2652          */
2653         ptlrpcd_add_req(req, pol, -1);
2654         RETURN(1);
2655 }
2656
2657 #define LOI_DEBUG(LOI, STR, args...)                                     \
2658         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2659                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2660                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2661                (LOI)->loi_write_lop.lop_num_pending,                     \
2662                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2663                (LOI)->loi_read_lop.lop_num_pending,                      \
2664                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2665                args)                                                     \
2666
2667 /* This is called by osc_check_rpcs() to find which objects have pages that
2668  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2669 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2670 {
2671         ENTRY;
2672
2673         /* First return objects that have blocked locks so that they
2674          * will be flushed quickly and other clients can get the lock,
2675          * then objects which have pages ready to be stuffed into RPCs */
2676         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2677                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2678                                       struct lov_oinfo, loi_hp_ready_item));
2679         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2680                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2681                                       struct lov_oinfo, loi_ready_item));
2682
2683         /* then if we have cache waiters, return all objects with queued
2684          * writes.  This is especially important when many small files
2685          * have filled up the cache and not been fired into rpcs because
2686          * they don't pass the nr_pending/object threshhold */
2687         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2688             !cfs_list_empty(&cli->cl_loi_write_list))
2689                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2690                                       struct lov_oinfo, loi_write_item));
2691
2692         /* then return all queued objects when we have an invalid import
2693          * so that they get flushed */
2694         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2695                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2696                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2697                                               struct lov_oinfo,
2698                                               loi_write_item));
2699                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2700                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2701                                               struct lov_oinfo, loi_read_item));
2702         }
2703         RETURN(NULL);
2704 }
2705
2706 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2707 {
2708         struct osc_async_page *oap;
2709         int hprpc = 0;
2710
2711         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2712                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2713                                      struct osc_async_page, oap_urgent_item);
2714                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2715         }
2716
2717         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2718                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2719                                      struct osc_async_page, oap_urgent_item);
2720                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2721         }
2722
2723         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2724 }
2725
2726 /* called with the loi list lock held */
2727 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
2728 {
2729         struct lov_oinfo *loi;
2730         int rc = 0, race_counter = 0;
2731         pdl_policy_t pol;
2732         ENTRY;
2733
2734         pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
2735
2736         while ((loi = osc_next_loi(cli)) != NULL) {
2737                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2738
2739                 if (osc_max_rpc_in_flight(cli, loi))
2740                         break;
2741
2742                 /* attempt some read/write balancing by alternating between
2743                  * reads and writes in an object.  The makes_rpc checks here
2744                  * would be redundant if we were getting read/write work items
2745                  * instead of objects.  we don't want send_oap_rpc to drain a
2746                  * partial read pending queue when we're given this object to
2747                  * do io on writes while there are cache waiters */
2748                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2749                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2750                                               &loi->loi_write_lop, pol);
2751                         if (rc < 0) {
2752                                 CERROR("Write request failed with %d\n", rc);
2753
2754                                 /* osc_send_oap_rpc failed, mostly because of
2755                                  * memory pressure.
2756                                  *
2757                                  * It can't break here, because if:
2758                                  *  - a page was submitted by osc_io_submit, so
2759                                  *    page locked;
2760                                  *  - no request in flight
2761                                  *  - no subsequent request
2762                                  * The system will be in live-lock state,
2763                                  * because there is no chance to call
2764                                  * osc_io_unplug() and osc_check_rpcs() any
2765                                  * more. pdflush can't help in this case,
2766                                  * because it might be blocked at grabbing
2767                                  * the page lock as we mentioned.
2768                                  *
2769                                  * Anyway, continue to drain pages. */
2770                                 /* break; */
2771                         }
2772
2773                         if (rc > 0)
2774                                 race_counter = 0;
2775                         else if (rc == 0)
2776                                 race_counter++;
2777                 }
2778                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2779                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2780                                               &loi->loi_read_lop, pol);
2781                         if (rc < 0)
2782                                 CERROR("Read request failed with %d\n", rc);
2783
2784                         if (rc > 0)
2785                                 race_counter = 0;
2786                         else if (rc == 0)
2787                                 race_counter++;
2788                 }
2789
2790                 /* attempt some inter-object balancing by issuing rpcs
2791                  * for each object in turn */
2792                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2793                         cfs_list_del_init(&loi->loi_hp_ready_item);
2794                 if (!cfs_list_empty(&loi->loi_ready_item))
2795                         cfs_list_del_init(&loi->loi_ready_item);
2796                 if (!cfs_list_empty(&loi->loi_write_item))
2797                         cfs_list_del_init(&loi->loi_write_item);
2798                 if (!cfs_list_empty(&loi->loi_read_item))
2799                         cfs_list_del_init(&loi->loi_read_item);
2800
2801                 loi_list_maint(cli, loi);
2802
2803                 /* send_oap_rpc fails with 0 when make_ready tells it to
2804                  * back off.  llite's make_ready does this when it tries
2805                  * to lock a page queued for write that is already locked.
2806                  * we want to try sending rpcs from many objects, but we
2807                  * don't want to spin failing with 0.  */
2808                 if (race_counter == 10)
2809                         break;
2810         }
2811 }
2812
2813 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2814 {
2815         osc_check_rpcs0(env, cli, 0);
2816 }
2817
2818 /* we're trying to queue a page in the osc so we're subject to the
2819  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2820  * If the osc's queued pages are already at that limit, then we want to sleep
2821  * until there is space in the osc's queue for us.  We also may be waiting for
2822  * write credits from the OST if there are RPCs in flight that may return some
2823  * before we fall back to sync writes.
2824  *
2825  * We need this know our allocation was granted in the presence of signals */
2826 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2827 {
2828         int rc;
2829         ENTRY;
2830         client_obd_list_lock(&cli->cl_loi_list_lock);
2831         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2832         client_obd_list_unlock(&cli->cl_loi_list_lock);
2833         RETURN(rc);
2834 };
2835
2836 /**
2837  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2838  * is available.
2839  */
2840 int osc_enter_cache_try(const struct lu_env *env,
2841                         struct client_obd *cli, struct lov_oinfo *loi,
2842                         struct osc_async_page *oap, int transient)
2843 {
2844         int has_grant;
2845
2846         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2847         if (has_grant) {
2848                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2849                 if (transient) {
2850                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2851                         cfs_atomic_inc(&obd_dirty_transit_pages);
2852                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2853                 }
2854         }
2855         return has_grant;
2856 }
2857
2858 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2859  * grant or cache space. */
2860 static int osc_enter_cache(const struct lu_env *env,
2861                            struct client_obd *cli, struct lov_oinfo *loi,
2862                            struct osc_async_page *oap)
2863 {
2864         struct osc_cache_waiter ocw;
2865         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2866
2867         ENTRY;
2868
2869         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2870                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2871                cli->cl_dirty_max, obd_max_dirty_pages,
2872                cli->cl_lost_grant, cli->cl_avail_grant);
2873
2874         /* force the caller to try sync io.  this can jump the list
2875          * of queued writes and create a discontiguous rpc stream */
2876         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2877             cli->cl_dirty_max < CFS_PAGE_SIZE     ||
2878             cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2879                 RETURN(-EDQUOT);
2880
2881         /* Hopefully normal case - cache space and write credits available */
2882         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2883             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2884             osc_enter_cache_try(env, cli, loi, oap, 0))
2885                 RETURN(0);
2886
2887         /* It is safe to block as a cache waiter as long as there is grant
2888          * space available or the hope of additional grant being returned
2889          * when an in flight write completes.  Using the write back cache
2890          * if possible is preferable to sending the data synchronously
2891          * because write pages can then be merged in to large requests.
2892          * The addition of this cache waiter will causing pending write
2893          * pages to be sent immediately. */
2894         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2895                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2896                 cfs_waitq_init(&ocw.ocw_waitq);
2897                 ocw.ocw_oap = oap;
2898                 ocw.ocw_rc = 0;
2899
2900                 loi_list_maint(cli, loi);
2901                 osc_check_rpcs(env, cli);
2902                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2903
2904                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2905                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2906
2907                 client_obd_list_lock(&cli->cl_loi_list_lock);
2908                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2909                         cfs_list_del(&ocw.ocw_entry);
2910                         RETURN(-EINTR);
2911                 }
2912                 RETURN(ocw.ocw_rc);
2913         }
2914
2915         RETURN(-EDQUOT);
2916 }
2917
2918
2919 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2920                         struct lov_oinfo *loi, cfs_page_t *page,
2921                         obd_off offset, const struct obd_async_page_ops *ops,
2922                         void *data, void **res, int nocache,
2923                         struct lustre_handle *lockh)
2924 {
2925         struct osc_async_page *oap;
2926
2927         ENTRY;
2928
2929         if (!page)
2930                 return cfs_size_round(sizeof(*oap));
2931
2932         oap = *res;
2933         oap->oap_magic = OAP_MAGIC;
2934         oap->oap_cli = &exp->exp_obd->u.cli;
2935         oap->oap_loi = loi;
2936
2937         oap->oap_caller_ops = ops;
2938         oap->oap_caller_data = data;
2939
2940         oap->oap_page = page;
2941         oap->oap_obj_off = offset;
2942         if (!client_is_remote(exp) &&
2943             cfs_capable(CFS_CAP_SYS_RESOURCE))
2944                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2945
2946         LASSERT(!(offset & ~CFS_PAGE_MASK));
2947
2948         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2949         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2950         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2951         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2952
2953         cfs_spin_lock_init(&oap->oap_lock);
2954         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2955         RETURN(0);
2956 }
2957
2958 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2959                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2960                        struct osc_async_page *oap, int cmd, int off,
2961                        int count, obd_flag brw_flags, enum async_flags async_flags)
2962 {
2963         struct client_obd *cli = &exp->exp_obd->u.cli;
2964         int rc = 0;
2965         ENTRY;
2966
2967         if (oap->oap_magic != OAP_MAGIC)
2968                 RETURN(-EINVAL);
2969
2970         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2971                 RETURN(-EIO);
2972
2973         if (!cfs_list_empty(&oap->oap_pending_item) ||
2974             !cfs_list_empty(&oap->oap_urgent_item) ||
2975             !cfs_list_empty(&oap->oap_rpc_item))
2976                 RETURN(-EBUSY);
2977
2978         /* check if the file's owner/group is over quota */
2979         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2980                 struct cl_object *obj;
2981                 struct cl_attr    attr; /* XXX put attr into thread info */
2982                 unsigned int qid[MAXQUOTAS];
2983
2984                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2985
2986                 cl_object_attr_lock(obj);
2987                 rc = cl_object_attr_get(env, obj, &attr);
2988                 cl_object_attr_unlock(obj);
2989
2990                 qid[USRQUOTA] = attr.cat_uid;
2991                 qid[GRPQUOTA] = attr.cat_gid;
2992                 if (rc == 0 &&
2993                     osc_quota_chkdq(cli, qid) == NO_QUOTA)
2994                         rc = -EDQUOT;
2995                 if (rc)
2996                         RETURN(rc);
2997         }
2998
2999         if (loi == NULL)
3000                 loi = lsm->lsm_oinfo[0];
3001
3002         client_obd_list_lock(&cli->cl_loi_list_lock);
3003
3004         LASSERT(off + count <= CFS_PAGE_SIZE);
3005         oap->oap_cmd = cmd;
3006         oap->oap_page_off = off;
3007         oap->oap_count = count;
3008         oap->oap_brw_flags = brw_flags;
3009         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3010         if (cfs_memory_pressure_get())
3011                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3012         cfs_spin_lock(&oap->oap_lock);
3013         oap->oap_async_flags = async_flags;
3014         cfs_spin_unlock(&oap->oap_lock);
3015
3016         if (cmd & OBD_BRW_WRITE) {
3017                 rc = osc_enter_cache(env, cli, loi, oap);
3018                 if (rc) {
3019                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3020                         RETURN(rc);
3021                 }
3022         }
3023
3024         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3025                   cmd);
3026
3027         osc_oap_to_pending(oap);
3028         loi_list_maint(cli, loi);
3029         if (!osc_max_rpc_in_flight(cli, loi) &&
3030             lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
3031                 LASSERT(cli->cl_writeback_work != NULL);
3032                 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
3033
3034                 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
3035                        cli, rc);
3036         }
3037         client_obd_list_unlock(&cli->cl_loi_list_lock);
3038
3039         RETURN(0);
3040 }
3041
3042 /* aka (~was & now & flag), but this is more clear :) */
3043 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3044
3045 int osc_set_async_flags_base(struct client_obd *cli,
3046                              struct lov_oinfo *loi, struct osc_async_page *oap,
3047                              obd_flag async_flags)
3048 {
3049         struct loi_oap_pages *lop;
3050         int flags = 0;
3051         ENTRY;
3052
3053         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3054
3055         if (oap->oap_cmd & OBD_BRW_WRITE) {
3056                 lop = &loi->loi_write_lop;
3057         } else {
3058                 lop = &loi->loi_read_lop;
3059         }
3060
3061         if ((oap->oap_async_flags & async_flags) == async_flags)
3062                 RETURN(0);
3063
3064         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3065                 flags |= ASYNC_READY;
3066
3067         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3068             cfs_list_empty(&oap->oap_rpc_item)) {
3069                 if (oap->oap_async_flags & ASYNC_HP)
3070                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3071                 else
3072                         cfs_list_add_tail(&oap->oap_urgent_item,
3073                                           &lop->lop_urgent);
3074                 flags |= ASYNC_URGENT;
3075                 loi_list_maint(cli, loi);
3076         }
3077         cfs_spin_lock(&oap->oap_lock);
3078         oap->oap_async_flags |= flags;
3079         cfs_spin_unlock(&oap->oap_lock);
3080
3081         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3082                         oap->oap_async_flags);
3083         RETURN(0);
3084 }
3085
3086 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3087                             struct lov_oinfo *loi, struct osc_async_page *oap)
3088 {
3089         struct client_obd *cli = &exp->exp_obd->u.cli;
3090         struct loi_oap_pages *lop;
3091         int rc = 0;
3092         ENTRY;
3093
3094         if (oap->oap_magic != OAP_MAGIC)
3095                 RETURN(-EINVAL);
3096
3097         if (loi == NULL)
3098                 loi = lsm->lsm_oinfo[0];
3099
3100         if (oap->oap_cmd & OBD_BRW_WRITE) {
3101                 lop = &loi->loi_write_lop;
3102         } else {
3103                 lop = &loi->loi_read_lop;
3104         }
3105
3106         client_obd_list_lock(&cli->cl_loi_list_lock);
3107
3108         if (!cfs_list_empty(&oap->oap_rpc_item))
3109                 GOTO(out, rc = -EBUSY);
3110
3111         osc_exit_cache(cli, oap, 0);
3112         osc_wake_cache_waiters(cli);
3113
3114         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3115                 cfs_list_del_init(&oap->oap_urgent_item);
3116                 cfs_spin_lock(&oap->oap_lock);
3117                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3118                 cfs_spin_unlock(&oap->oap_lock);
3119         }
3120         if (!cfs_list_empty(&oap->oap_pending_item)) {
3121                 cfs_list_del_init(&oap->oap_pending_item);
3122                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3123         }
3124         loi_list_maint(cli, loi);
3125         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3126 out:
3127         client_obd_list_unlock(&cli->cl_loi_list_lock);
3128         RETURN(rc);
3129 }
3130
3131 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3132                                         struct ldlm_enqueue_info *einfo)
3133 {
3134         void *data = einfo->ei_cbdata;
3135         int set = 0;
3136
3137         LASSERT(lock != NULL);
3138         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3139         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3140         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3141         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3142
3143         lock_res_and_lock(lock);
3144         cfs_spin_lock(&osc_ast_guard);
3145
3146         if (lock->l_ast_data == NULL)
3147                 lock->l_ast_data = data;
3148         if (lock->l_ast_data == data)
3149                 set = 1;
3150
3151         cfs_spin_unlock(&osc_ast_guard);
3152         unlock_res_and_lock(lock);
3153
3154         return set;
3155 }
3156
3157 static int osc_set_data_with_check(struct lustre_handle *lockh,
3158                                    struct ldlm_enqueue_info *einfo)
3159 {
3160         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3161         int set = 0;
3162
3163         if (lock != NULL) {
3164                 set = osc_set_lock_data_with_check(lock, einfo);
3165                 LDLM_LOCK_PUT(lock);
3166         } else
3167                 CERROR("lockh %p, data %p - client evicted?\n",
3168                        lockh, einfo->ei_cbdata);
3169         return set;
3170 }
3171
3172 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3173                              ldlm_iterator_t replace, void *data)
3174 {
3175         struct ldlm_res_id res_id;
3176         struct obd_device *obd = class_exp2obd(exp);
3177
3178         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3179         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3180         return 0;
3181 }
3182
3183 /* find any ldlm lock of the inode in osc
3184  * return 0    not find
3185  *        1    find one
3186  *      < 0    error */
3187 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3188                            ldlm_iterator_t replace, void *data)
3189 {
3190         struct ldlm_res_id res_id;
3191         struct obd_device *obd = class_exp2obd(exp);
3192         int rc = 0;
3193
3194         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3195         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3196         if (rc == LDLM_ITER_STOP)
3197                 return(1);
3198         if (rc == LDLM_ITER_CONTINUE)
3199                 return(0);
3200         return(rc);
3201 }
3202
3203 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3204                             obd_enqueue_update_f upcall, void *cookie,
3205                             int *flags, int agl, int rc)
3206 {
3207         int intent = *flags & LDLM_FL_HAS_INTENT;
3208         ENTRY;
3209
3210         if (intent) {
3211                 /* The request was created before ldlm_cli_enqueue call. */
3212                 if (rc == ELDLM_LOCK_ABORTED) {
3213                         struct ldlm_reply *rep;
3214                         rep = req_capsule_server_get(&req->rq_pill,
3215                                                      &RMF_DLM_REP);
3216
3217                         LASSERT(rep != NULL);
3218                         if (rep->lock_policy_res1)
3219                                 rc = rep->lock_policy_res1;
3220                 }
3221         }
3222
3223         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
3224             (rc == 0)) {
3225                 *flags |= LDLM_FL_LVB_READY;
3226                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3227                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3228         }
3229
3230         /* Call the update callback. */
3231         rc = (*upcall)(cookie, rc);
3232         RETURN(rc);
3233 }
3234
3235 static int osc_enqueue_interpret(const struct lu_env *env,
3236                                  struct ptlrpc_request *req,
3237                                  struct osc_enqueue_args *aa, int rc)
3238 {
3239         struct ldlm_lock *lock;
3240         struct lustre_handle handle;
3241         __u32 mode;
3242         struct ost_lvb *lvb;
3243         __u32 lvb_len;
3244         int *flags = aa->oa_flags;
3245
3246         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3247          * might be freed anytime after lock upcall has been called. */
3248         lustre_handle_copy(&handle, aa->oa_lockh);
3249         mode = aa->oa_ei->ei_mode;
3250
3251         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3252          * be valid. */
3253         lock = ldlm_handle2lock(&handle);
3254
3255         /* Take an additional reference so that a blocking AST that
3256          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3257          * to arrive after an upcall has been executed by
3258          * osc_enqueue_fini(). */
3259         ldlm_lock_addref(&handle, mode);
3260
3261         /* Let CP AST to grant the lock first. */
3262         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3263
3264         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
3265                 lvb = NULL;
3266                 lvb_len = 0;
3267         } else {
3268                 lvb = aa->oa_lvb;
3269                 lvb_len = sizeof(*aa->oa_lvb);
3270         }
3271
3272         /* Complete obtaining the lock procedure. */
3273         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3274                                    mode, flags, lvb, lvb_len, &handle, rc);
3275         /* Complete osc stuff. */
3276         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
3277                               flags, aa->oa_agl, rc);
3278
3279         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3280
3281         /* Release the lock for async request. */
3282         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3283                 /*
3284                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3285                  * not already released by
3286                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3287                  */
3288                 ldlm_lock_decref(&handle, mode);
3289
3290         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3291                  aa->oa_lockh, req, aa);
3292         ldlm_lock_decref(&handle, mode);
3293         LDLM_LOCK_PUT(lock);
3294         return rc;
3295 }
3296
3297 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3298                         struct lov_oinfo *loi, int flags,
3299                         struct ost_lvb *lvb, __u32 mode, int rc)
3300 {
3301         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3302
3303         if (rc == ELDLM_OK) {
3304                 __u64 tmp;
3305
3306                 LASSERT(lock != NULL);
3307                 loi->loi_lvb = *lvb;
3308                 tmp = loi->loi_lvb.lvb_size;
3309                 /* Extend KMS up to the end of this lock and no further
3310                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3311                 if (tmp > lock->l_policy_data.l_extent.end)
3312                         tmp = lock->l_policy_data.l_extent.end + 1;
3313                 if (tmp >= loi->loi_kms) {
3314                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3315                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3316                         loi_kms_set(loi, tmp);
3317                 } else {
3318                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3319                                    LPU64"; leaving kms="LPU64", end="LPU64,
3320                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3321                                    lock->l_policy_data.l_extent.end);
3322                 }
3323                 ldlm_lock_allow_match(lock);
3324         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3325                 LASSERT(lock != NULL);
3326                 loi->loi_lvb = *lvb;
3327                 ldlm_lock_allow_match(lock);
3328                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3329                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3330                 rc = ELDLM_OK;
3331         }
3332
3333         if (lock != NULL) {
3334                 if (rc != ELDLM_OK)
3335                         ldlm_lock_fail_match(lock, rc);
3336
3337                 LDLM_LOCK_PUT(lock);
3338         }
3339 }
3340 EXPORT_SYMBOL(osc_update_enqueue);
3341
3342 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3343
3344 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3345  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3346  * other synchronous requests, however keeping some locks and trying to obtain
3347  * others may take a considerable amount of time in a case of ost failure; and
3348  * when other sync requests do not get released lock from a client, the client
3349  * is excluded from the cluster -- such scenarious make the life difficult, so
3350  * release locks just after they are obtained. */
3351 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3352                      int *flags, ldlm_policy_data_t *policy,
3353                      struct ost_lvb *lvb, int kms_valid,
3354                      obd_enqueue_update_f upcall, void *cookie,
3355                      struct ldlm_enqueue_info *einfo,
3356                      struct lustre_handle *lockh,
3357                      struct ptlrpc_request_set *rqset, int async, int agl)
3358 {
3359         struct obd_device *obd = exp->exp_obd;
3360         struct ptlrpc_request *req = NULL;
3361         int intent = *flags & LDLM_FL_HAS_INTENT;
3362         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
3363         ldlm_mode_t mode;
3364         int rc;
3365         ENTRY;
3366
3367         /* Filesystem lock extents are extended to page boundaries so that
3368          * dealing with the page cache is a little smoother.  */
3369         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3370         policy->l_extent.end |= ~CFS_PAGE_MASK;
3371
3372         /*
3373          * kms is not valid when either object is completely fresh (so that no
3374          * locks are cached), or object was evicted. In the latter case cached
3375          * lock cannot be used, because it would prime inode state with
3376          * potentially stale LVB.
3377          */
3378         if (!kms_valid)
3379                 goto no_match;
3380
3381         /* Next, search for already existing extent locks that will cover us */
3382         /* If we're trying to read, we also search for an existing PW lock.  The
3383          * VFS and page cache already protect us locally, so lots of readers/
3384          * writers can share a single PW lock.
3385          *
3386          * There are problems with conversion deadlocks, so instead of
3387          * converting a read lock to a write lock, we'll just enqueue a new
3388          * one.
3389          *
3390          * At some point we should cancel the read lock instead of making them
3391          * send us a blocking callback, but there are problems with canceling
3392          * locks out from other users right now, too. */
3393         mode = einfo->ei_mode;
3394         if (einfo->ei_mode == LCK_PR)
3395                 mode |= LCK_PW;
3396         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
3397                                einfo->ei_type, policy, mode, lockh, 0);
3398         if (mode) {
3399                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3400
3401                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
3402                         /* For AGL, if enqueue RPC is sent but the lock is not
3403                          * granted, then skip to process this strpe.
3404                          * Return -ECANCELED to tell the caller. */
3405                         ldlm_lock_decref(lockh, mode);
3406                         LDLM_LOCK_PUT(matched);
3407                         RETURN(-ECANCELED);
3408                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
3409                         *flags |= LDLM_FL_LVB_READY;
3410                         /* addref the lock only if not async requests and PW
3411                          * lock is matched whereas we asked for PR. */
3412                         if (!rqset && einfo->ei_mode != mode)
3413                                 ldlm_lock_addref(lockh, LCK_PR);
3414                         if (intent) {
3415                                 /* I would like to be able to ASSERT here that
3416                                  * rss <= kms, but I can't, for reasons which
3417                                  * are explained in lov_enqueue() */
3418                         }
3419
3420                         /* We already have a lock, and it's referenced */
3421                         (*upcall)(cookie, ELDLM_OK);
3422
3423                         if (einfo->ei_mode != mode)
3424                                 ldlm_lock_decref(lockh, LCK_PW);
3425                         else if (rqset)
3426                                 /* For async requests, decref the lock. */
3427                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3428                         LDLM_LOCK_PUT(matched);
3429                         RETURN(ELDLM_OK);
3430                 } else {
3431                         ldlm_lock_decref(lockh, mode);
3432                         LDLM_LOCK_PUT(matched);
3433                 }
3434         }
3435
3436  no_match:
3437         if (intent) {
3438                 CFS_LIST_HEAD(cancels);
3439                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3440                                            &RQF_LDLM_ENQUEUE_LVB);
3441                 if (req == NULL)
3442                         RETURN(-ENOMEM);
3443
3444                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3445                 if (rc) {
3446                         ptlrpc_request_free(req);
3447                         RETURN(rc);
3448                 }
3449
3450                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3451                                      sizeof *lvb);
3452                 ptlrpc_request_set_replen(req);
3453         }
3454
3455         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3456         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3457
3458         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3459                               sizeof(*lvb), lockh, async);
3460         if (rqset) {
3461                 if (!rc) {
3462                         struct osc_enqueue_args *aa;
3463                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3464                         aa = ptlrpc_req_async_args(req);
3465                         aa->oa_ei = einfo;
3466                         aa->oa_exp = exp;
3467                         aa->oa_flags  = flags;
3468                         aa->oa_upcall = upcall;
3469                         aa->oa_cookie = cookie;
3470                         aa->oa_lvb    = lvb;
3471                         aa->oa_lockh  = lockh;
3472                         aa->oa_agl    = !!agl;
3473
3474                         req->rq_interpret_reply =
3475                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3476                         if (rqset == PTLRPCD_SET)
3477                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3478                         else
3479                                 ptlrpc_set_add_req(rqset, req);
3480                 } else if (intent) {
3481                         ptlrpc_req_finished(req);
3482                 }
3483                 RETURN(rc);
3484         }
3485
3486         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
3487         if (intent)
3488                 ptlrpc_req_finished(req);
3489
3490         RETURN(rc);
3491 }
3492
3493 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3494                        struct ldlm_enqueue_info *einfo,
3495                        struct ptlrpc_request_set *rqset)
3496 {
3497         struct ldlm_res_id res_id;
3498         int rc;
3499         ENTRY;
3500
3501         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3502                            oinfo->oi_md->lsm_object_seq, &res_id);
3503
3504         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3505                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3506                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3507                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3508                               rqset, rqset != NULL, 0);
3509         RETURN(rc);
3510 }
3511
3512 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3513                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3514                    int *flags, void *data, struct lustre_handle *lockh,
3515                    int unref)
3516 {
3517         struct obd_device *obd = exp->exp_obd;
3518         int lflags = *flags;
3519         ldlm_mode_t rc;
3520         ENTRY;
3521
3522         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3523                 RETURN(-EIO);
3524
3525         /* Filesystem lock extents are extended to page boundaries so that
3526          * dealing with the page cache is a little smoother */
3527         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3528         policy->l_extent.end |= ~CFS_PAGE_MASK;
3529
3530         /* Next, search for already existing extent locks that will cover us */
3531         /* If we're trying to read, we also search for an existing PW lock.  The
3532          * VFS and page cache already protect us locally, so lots of readers/
3533          * writers can share a single PW lock. */
3534         rc = mode;
3535         if (mode == LCK_PR)
3536                 rc |= LCK_PW;
3537         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3538                              res_id, type, policy, rc, lockh, unref);
3539         if (rc) {
3540                 if (data != NULL) {
3541                         if (!osc_set_data_with_check(lockh, data)) {
3542                                 if (!(lflags & LDLM_FL_TEST_LOCK))
3543                                         ldlm_lock_decref(lockh, rc);
3544                                 RETURN(0);
3545                         }
3546                 }
3547                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3548                         ldlm_lock_addref(lockh, LCK_PR);
3549                         ldlm_lock_decref(lockh, LCK_PW);
3550                 }
3551                 RETURN(rc);
3552         }
3553         RETURN(rc);
3554 }
3555
3556 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3557 {
3558         ENTRY;
3559
3560         if (unlikely(mode == LCK_GROUP))
3561                 ldlm_lock_decref_and_cancel(lockh, mode);
3562         else
3563                 ldlm_lock_decref(lockh, mode);
3564
3565         RETURN(0);
3566 }
3567
3568 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3569                       __u32 mode, struct lustre_handle *lockh)
3570 {
3571         ENTRY;
3572         RETURN(osc_cancel_base(lockh, mode));
3573 }
3574
3575 static int osc_cancel_unused(struct obd_export *exp,
3576                              struct lov_stripe_md *lsm,
3577                              ldlm_cancel_flags_t flags,
3578                              void *opaque)
3579 {
3580         struct obd_device *obd = class_exp2obd(exp);
3581         struct ldlm_res_id res_id, *resp = NULL;
3582
3583         if (lsm != NULL) {
3584                 resp = osc_build_res_name(lsm->lsm_object_id,
3585                                           lsm->lsm_object_seq, &res_id);
3586         }
3587
3588         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3589 }
3590
3591 static int osc_statfs_interpret(const struct lu_env *env,
3592                                 struct ptlrpc_request *req,
3593                                 struct osc_async_args *aa, int rc)
3594 {
3595         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3596         struct obd_statfs *msfs;
3597         __u64 used;
3598         ENTRY;
3599
3600         if (rc == -EBADR)
3601                 /* The request has in fact never been sent
3602                  * due to issues at a higher level (LOV).
3603                  * Exit immediately since the caller is
3604                  * aware of the problem and takes care
3605                  * of the clean up */
3606                  RETURN(rc);
3607
3608         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3609             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3610                 GOTO(out, rc = 0);
3611
3612         if (rc != 0)
3613                 GOTO(out, rc);
3614
3615         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3616         if (msfs == NULL) {
3617                 GOTO(out, rc = -EPROTO);
3618         }
3619
3620         /* Reinitialize the RDONLY and DEGRADED flags at the client
3621          * on each statfs, so they don't stay set permanently. */
3622         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3623
3624         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3625                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3626         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3627                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3628
3629         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3630                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3631         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3632                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3633
3634         /* Add a bit of hysteresis so this flag isn't continually flapping,
3635          * and ensure that new files don't get extremely fragmented due to
3636          * only a small amount of available space in the filesystem.
3637          * We want to set the NOSPC flag when there is less than ~0.1% free
3638          * and clear it when there is at least ~0.2% free space, so:
3639          *                   avail < ~0.1% max          max = avail + used
3640          *            1025 * avail < avail + used       used = blocks - free
3641          *            1024 * avail < used
3642          *            1024 * avail < blocks - free
3643          *                   avail < ((blocks - free) >> 10)
3644          *
3645          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3646          * lose that amount of space so in those cases we report no space left
3647          * if their is less than 1 GB left.                             */
3648         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3649         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3650                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3651                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3652         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3653                           (msfs->os_ffree > 64) &&
3654                           (msfs->os_bavail > (used << 1)))) {
3655                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3656                                              OSCC_FLAG_NOSPC_BLK);
3657         }
3658
3659         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3660                      (msfs->os_bavail < used)))
3661                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3662
3663         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3664
3665         *aa->aa_oi->oi_osfs = *msfs;
3666 out:
3667         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3668         RETURN(rc);
3669 }
3670
3671 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3672                             __u64 max_age, struct ptlrpc_request_set *rqset)
3673 {
3674         struct ptlrpc_request *req;
3675         struct osc_async_args *aa;
3676         int                    rc;
3677         ENTRY;
3678
3679         /* We could possibly pass max_age in the request (as an absolute
3680          * timestamp or a "seconds.usec ago") so the target can avoid doing
3681          * extra calls into the filesystem if that isn't necessary (e.g.
3682          * during mount that would help a bit).  Having relative timestamps
3683          * is not so great if request processing is slow, while absolute
3684          * timestamps are not ideal because they need time synchronization. */
3685         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3686         if (req == NULL)
3687                 RETURN(-ENOMEM);
3688
3689         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3690         if (rc) {
3691                 ptlrpc_request_free(req);
3692                 RETURN(rc);
3693         }
3694         ptlrpc_request_set_replen(req);
3695         req->rq_request_portal = OST_CREATE_PORTAL;
3696         ptlrpc_at_set_req_timeout(req);
3697
3698         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3699                 /* procfs requests not want stat in wait for avoid deadlock */
3700                 req->rq_no_resend = 1;
3701                 req->rq_no_delay = 1;
3702         }
3703
3704         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3705         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3706         aa = ptlrpc_req_async_args(req);
3707         aa->aa_oi = oinfo;
3708
3709         ptlrpc_set_add_req(rqset, req);
3710         RETURN(0);
3711 }
3712
3713 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3714                       __u64 max_age, __u32 flags)
3715 {
3716         struct obd_statfs     *msfs;
3717         struct ptlrpc_request *req;
3718         struct obd_import     *imp = NULL;
3719         int rc;
3720         ENTRY;
3721
3722         /*Since the request might also come from lprocfs, so we need
3723          *sync this with client_disconnect_export Bug15684*/
3724         cfs_down_read(&obd->u.cli.cl_sem);
3725         if (obd->u.cli.cl_import)
3726                 imp = class_import_get(obd->u.cli.cl_import);
3727         cfs_up_read(&obd->u.cli.cl_sem);
3728         if (!imp)
3729                 RETURN(-ENODEV);
3730
3731         /* We could possibly pass max_age in the request (as an absolute
3732          * timestamp or a "seconds.usec ago") so the target can avoid doing
3733          * extra calls into the filesystem if that isn't necessary (e.g.
3734          * during mount that would help a bit).  Having relative timestamps
3735          * is not so great if request processing is slow, while absolute
3736          * timestamps are not ideal because they need time synchronization. */
3737         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3738
3739         class_import_put(imp);
3740
3741         if (req == NULL)
3742                 RETURN(-ENOMEM);
3743
3744         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3745         if (rc) {
3746                 ptlrpc_request_free(req);
3747                 RETURN(rc);
3748         }
3749         ptlrpc_request_set_replen(req);
3750         req->rq_request_portal = OST_CREATE_PORTAL;
3751         ptlrpc_at_set_req_timeout(req);
3752
3753         if (flags & OBD_STATFS_NODELAY) {
3754                 /* procfs requests not want stat in wait for avoid deadlock */
3755                 req->rq_no_resend = 1;
3756                 req->rq_no_delay = 1;
3757         }
3758
3759         rc = ptlrpc_queue_wait(req);
3760         if (rc)
3761                 GOTO(out, rc);
3762
3763         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3764         if (msfs == NULL) {
3765                 GOTO(out, rc = -EPROTO);
3766         }
3767
3768         *osfs = *msfs;
3769
3770         EXIT;
3771  out:
3772         ptlrpc_req_finished(req);
3773         return rc;
3774 }
3775
3776 /* Retrieve object striping information.
3777  *
3778  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3779  * the maximum number of OST indices which will fit in the user buffer.
3780  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3781  */
3782 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3783 {
3784         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3785         struct lov_user_md_v3 lum, *lumk;
3786         struct lov_user_ost_data_v1 *lmm_objects;
3787         int rc = 0, lum_size;
3788         ENTRY;
3789
3790         if (!lsm)
3791                 RETURN(-ENODATA);
3792
3793         /* we only need the header part from user space to get lmm_magic and
3794          * lmm_stripe_count, (the header part is common to v1 and v3) */
3795         lum_size = sizeof(struct lov_user_md_v1);
3796         if (cfs_copy_from_user(&lum, lump, lum_size))
3797                 RETURN(-EFAULT);
3798
3799         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3800             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3801                 RETURN(-EINVAL);
3802
3803         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3804         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3805         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3806         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3807
3808         /* we can use lov_mds_md_size() to compute lum_size
3809          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3810         if (lum.lmm_stripe_count > 0) {
3811                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3812                 OBD_ALLOC(lumk, lum_size);
3813                 if (!lumk)
3814                         RETURN(-ENOMEM);
3815
3816                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3817                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3818                 else
3819                         lmm_objects = &(lumk->lmm_objects[0]);
3820                 lmm_objects->l_object_id = lsm->lsm_object_id;
3821         } else {
3822                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3823                 lumk = &lum;
3824         }
3825
3826         lumk->lmm_object_id = lsm->lsm_object_id;
3827         lumk->lmm_object_seq = lsm->lsm_object_seq;
3828         lumk->lmm_stripe_count = 1;
3829
3830         if (cfs_copy_to_user(lump, lumk, lum_size))
3831                 rc = -EFAULT;
3832
3833         if (lumk != &lum)
3834                 OBD_FREE(lumk, lum_size);
3835
3836         RETURN(rc);
3837 }
3838
3839
3840 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3841                          void *karg, void *uarg)
3842 {
3843         struct obd_device *obd = exp->exp_obd;
3844         struct obd_ioctl_data *data = karg;
3845         int err = 0;
3846         ENTRY;
3847
3848         if (!cfs_try_module_get(THIS_MODULE)) {
3849                 CERROR("Can't get module. Is it alive?");
3850                 return -EINVAL;
3851         }
3852         switch (cmd) {
3853         case OBD_IOC_LOV_GET_CONFIG: {
3854                 char *buf;
3855                 struct lov_desc *desc;
3856                 struct obd_uuid uuid;
3857
3858                 buf = NULL;
3859                 len = 0;
3860                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3861                         GOTO(out, err = -EINVAL);
3862
3863                 data = (struct obd_ioctl_data *)buf;
3864
3865                 if (sizeof(*desc) > data->ioc_inllen1) {
3866                         obd_ioctl_freedata(buf, len);
3867                         GOTO(out, err = -EINVAL);
3868                 }
3869
3870                 if (data->ioc_inllen2 < sizeof(uuid)) {
3871                         obd_ioctl_freedata(buf, len);
3872                         GOTO(out, err = -EINVAL);
3873                 }
3874
3875                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3876                 desc->ld_tgt_count = 1;
3877                 desc->ld_active_tgt_count = 1;
3878                 desc->ld_default_stripe_count = 1;
3879                 desc->ld_default_stripe_size = 0;
3880                 desc->ld_default_stripe_offset = 0;
3881                 desc->ld_pattern = 0;
3882                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3883
3884                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3885
3886                 err = cfs_copy_to_user((void *)uarg, buf, len);
3887                 if (err)
3888                         err = -EFAULT;
3889                 obd_ioctl_freedata(buf, len);
3890                 GOTO(out, err);
3891         }
3892         case LL_IOC_LOV_SETSTRIPE:
3893                 err = obd_alloc_memmd(exp, karg);
3894                 if (err > 0)
3895                         err = 0;
3896                 GOTO(out, err);
3897         case LL_IOC_LOV_GETSTRIPE:
3898                 err = osc_getstripe(karg, uarg);
3899                 GOTO(out, err);
3900         case OBD_IOC_CLIENT_RECOVER:
3901                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3902                                             data->ioc_inlbuf1, 0);
3903                 if (err > 0)
3904                         err = 0;
3905                 GOTO(out, err);
3906         case IOC_OSC_SET_ACTIVE:
3907                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3908                                                data->ioc_offset);
3909                 GOTO(out, err);
3910         case OBD_IOC_POLL_QUOTACHECK:
3911                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3912                 GOTO(out, err);
3913         case OBD_IOC_PING_TARGET:
3914                 err = ptlrpc_obd_ping(obd);
3915                 GOTO(out, err);
3916         default:
3917                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3918                        cmd, cfs_curproc_comm());
3919                 GOTO(out, err = -ENOTTY);
3920         }
3921 out:
3922         cfs_module_put(THIS_MODULE);
3923         return err;
3924 }
3925
3926 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3927                         void *key, __u32 *vallen, void *val,
3928                         struct lov_stripe_md *lsm)
3929 {
3930         ENTRY;
3931         if (!vallen || !val)
3932                 RETURN(-EFAULT);
3933
3934         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3935                 __u32 *stripe = val;
3936                 *vallen = sizeof(*stripe);
3937                 *stripe = 0;
3938                 RETURN(0);
3939         } else if (KEY_IS(KEY_LAST_ID)) {
3940                 struct ptlrpc_request *req;
3941                 obd_id                *reply;
3942                 char                  *tmp;
3943                 int                    rc;
3944
3945                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3946                                            &RQF_OST_GET_INFO_LAST_ID);
3947                 if (req == NULL)
3948                         RETURN(-ENOMEM);
3949
3950                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3951                                      RCL_CLIENT, keylen);
3952                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3953                 if (rc) {
3954                         ptlrpc_request_free(req);
3955                         RETURN(rc);
3956                 }
3957
3958                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3959                 memcpy(tmp, key, keylen);
3960
3961                 req->rq_no_delay = req->rq_no_resend = 1;
3962                 ptlrpc_request_set_replen(req);
3963                 rc = ptlrpc_queue_wait(req);
3964                 if (rc)
3965                         GOTO(out, rc);
3966
3967                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3968                 if (reply == NULL)
3969                         GOTO(out, rc = -EPROTO);
3970
3971                 *((obd_id *)val) = *reply;
3972         out:
3973                 ptlrpc_req_finished(req);
3974                 RETURN(rc);
3975         } else if (KEY_IS(KEY_FIEMAP)) {
3976                 struct ptlrpc_request *req;
3977                 struct ll_user_fiemap *reply;
3978                 char *tmp;
3979                 int rc;
3980
3981                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3982                                            &RQF_OST_GET_INFO_FIEMAP);
3983                 if (req == NULL)
3984                         RETURN(-ENOMEM);
3985
3986                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3987                                      RCL_CLIENT, keylen);
3988                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3989                                      RCL_CLIENT, *vallen);
3990                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3991                                      RCL_SERVER, *vallen);
3992
3993                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3994                 if (rc) {
3995                         ptlrpc_request_free(req);
3996                         RETURN(rc);
3997                 }
3998
3999                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4000                 memcpy(tmp, key, keylen);
4001                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4002                 memcpy(tmp, val, *vallen);
4003
4004                 ptlrpc_request_set_replen(req);
4005                 rc = ptlrpc_queue_wait(req);
4006                 if (rc)
4007                         GOTO(out1, rc);
4008
4009                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4010                 if (reply == NULL)
4011                         GOTO(out1, rc = -EPROTO);
4012
4013                 memcpy(val, reply, *vallen);
4014         out1:
4015                 ptlrpc_req_finished(req);
4016
4017                 RETURN(rc);
4018         }
4019
4020         RETURN(-EINVAL);
4021 }
4022
4023 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4024 {
4025         struct llog_ctxt *ctxt;
4026         int rc = 0;
4027         ENTRY;
4028
4029         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4030         if (ctxt) {
4031                 rc = llog_initiator_connect(ctxt);
4032                 llog_ctxt_put(ctxt);
4033         } else {
4034                 /* XXX return an error? skip setting below flags? */
4035         }
4036
4037         cfs_spin_lock(&imp->imp_lock);
4038         imp->imp_server_timeout = 1;
4039         imp->imp_pingable = 1;
4040         cfs_spin_unlock(&imp->imp_lock);
4041         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4042
4043         RETURN(rc);
4044 }
4045
4046 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4047                                           struct ptlrpc_request *req,
4048                                           void *aa, int rc)
4049 {
4050         ENTRY;
4051         if (rc != 0)
4052                 RETURN(rc);
4053
4054         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4055 }
4056
4057 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4058                               void *key, obd_count vallen, void *val,
4059                               struct ptlrpc_request_set *set)
4060 {
4061         struct ptlrpc_request *req;
4062         struct obd_device     *obd = exp->exp_obd;
4063         struct obd_import     *imp = class_exp2cliimp(exp);
4064         char                  *tmp;
4065         int                    rc;
4066         ENTRY;
4067
4068         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4069
4070         if (KEY_IS(KEY_NEXT_ID)) {
4071                 obd_id new_val;
4072                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4073
4074                 if (vallen != sizeof(obd_id))
4075                         RETURN(-ERANGE);
4076                 if (val == NULL)
4077                         RETURN(-EINVAL);
4078
4079                 if (vallen != sizeof(obd_id))
4080                         RETURN(-EINVAL);
4081
4082                 /* avoid race between allocate new object and set next id
4083                  * from ll_sync thread */
4084                 cfs_spin_lock(&oscc->oscc_lock);
4085                 new_val = *((obd_id*)val) + 1;
4086                 if (new_val > oscc->oscc_next_id)
4087                         oscc->oscc_next_id = new_val;
4088                 cfs_spin_unlock(&oscc->oscc_lock);
4089                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4090                        exp->exp_obd->obd_name,
4091                        obd->u.cli.cl_oscc.oscc_next_id);
4092
4093                 RETURN(0);
4094         }
4095
4096         if (KEY_IS(KEY_CHECKSUM)) {
4097                 if (vallen != sizeof(int))
4098                         RETURN(-EINVAL);
4099                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4100                 RETURN(0);
4101         }
4102
4103         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4104                 sptlrpc_conf_client_adapt(obd);
4105                 RETURN(0);
4106         }
4107
4108         if (KEY_IS(KEY_FLUSH_CTX)) {
4109                 sptlrpc_import_flush_my_ctx(imp);
4110                 RETURN(0);
4111         }
4112
4113         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4114                 RETURN(-EINVAL);
4115
4116         /* We pass all other commands directly to OST. Since nobody calls osc
4117            methods directly and everybody is supposed to go through LOV, we
4118            assume lov checked invalid values for us.
4119            The only recognised values so far are evict_by_nid and mds_conn.
4120            Even if something bad goes through, we'd get a -EINVAL from OST
4121            anyway. */
4122
4123         if (KEY_IS(KEY_GRANT_SHRINK))
4124                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4125         else
4126                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4127
4128         if (req == NULL)
4129                 RETURN(-ENOMEM);
4130
4131         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4132                              RCL_CLIENT, keylen);
4133         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4134                              RCL_CLIENT, vallen);
4135         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4136         if (rc) {
4137                 ptlrpc_request_free(req);
4138                 RETURN(rc);
4139         }
4140
4141         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4142         memcpy(tmp, key, keylen);
4143         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4144         memcpy(tmp, val, vallen);
4145
4146         if (KEY_IS(KEY_MDS_CONN)) {
4147                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4148
4149                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4150                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4151                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4152                 req->rq_no_delay = req->rq_no_resend = 1;
4153                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4154         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4155                 struct osc_grant_args *aa;
4156                 struct obdo *oa;
4157
4158                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4159                 aa = ptlrpc_req_async_args(req);
4160                 OBDO_ALLOC(oa);
4161                 if (!oa) {
4162                         ptlrpc_req_finished(req);
4163                         RETURN(-ENOMEM);
4164                 }
4165                 *oa = ((struct ost_body *)val)->oa;
4166                 aa->aa_oa = oa;
4167                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4168         }
4169
4170         ptlrpc_request_set_replen(req);
4171         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4172                 LASSERT(set != NULL);
4173                 ptlrpc_set_add_req(set, req);
4174                 ptlrpc_check_set(NULL, set);
4175         } else
4176                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
4177
4178         RETURN(0);
4179 }
4180
4181
4182 static struct llog_operations osc_size_repl_logops = {
4183         lop_cancel: llog_obd_repl_cancel
4184 };
4185
4186 static struct llog_operations osc_mds_ost_orig_logops;
4187
4188 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4189                            struct obd_device *tgt, struct llog_catid *catid)
4190 {
4191         int rc;
4192         ENTRY;
4193
4194         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4195                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4196         if (rc) {
4197                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4198                 GOTO(out, rc);
4199         }
4200
4201         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4202                         NULL, &osc_size_repl_logops);
4203         if (rc) {
4204                 struct llog_ctxt *ctxt =
4205                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4206                 if (ctxt)
4207                         llog_cleanup(ctxt);
4208                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4209         }
4210         GOTO(out, rc);
4211 out:
4212         if (rc) {
4213                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4214                        obd->obd_name, tgt->obd_name, catid, rc);
4215                 CERROR("logid "LPX64":0x%x\n",
4216                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4217         }
4218         return rc;
4219 }
4220
4221 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4222                          struct obd_device *disk_obd, int *index)
4223 {
4224         struct llog_catid catid;
4225         static char name[32] = CATLIST;
4226         int rc;
4227         ENTRY;
4228
4229         LASSERT(olg == &obd->obd_olg);
4230
4231         cfs_mutex_down(&olg->olg_cat_processing);
4232         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4233         if (rc) {
4234                 CERROR("rc: %d\n", rc);
4235                 GOTO(out, rc);
4236         }
4237
4238         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4239                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4240                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4241
4242         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4243         if (rc) {
4244                 CERROR("rc: %d\n", rc);
4245                 GOTO(out, rc);
4246         }
4247
4248         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4249         if (rc) {
4250                 CERROR("rc: %d\n", rc);
4251                 GOTO(out, rc);
4252         }
4253
4254  out:
4255         cfs_mutex_up(&olg->olg_cat_processing);
4256
4257         return rc;
4258 }
4259
4260 static int osc_llog_finish(struct obd_device *obd, int count)
4261 {
4262         struct llog_ctxt *ctxt;
4263         int rc = 0, rc2 = 0;
4264         ENTRY;
4265
4266         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4267         if (ctxt)
4268                 rc = llog_cleanup(ctxt);
4269
4270         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4271         if (ctxt)
4272                 rc2 = llog_cleanup(ctxt);
4273         if (!rc)
4274                 rc = rc2;
4275
4276         RETURN(rc);
4277 }
4278
4279 static int osc_reconnect(const struct lu_env *env,
4280                          struct obd_export *exp, struct obd_device *obd,
4281                          struct obd_uuid *cluuid,
4282                          struct obd_connect_data *data,
4283                          void *localdata)
4284 {
4285         struct client_obd *cli = &obd->u.cli;
4286
4287         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4288                 long lost_grant;
4289
4290                 client_obd_list_lock(&cli->cl_loi_list_lock);
4291                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4292                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4293                 lost_grant = cli->cl_lost_grant;
4294                 cli->cl_lost_grant = 0;
4295                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4296
4297                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4298                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4299                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4300                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4301                        " ocd_grant: %d\n", data->ocd_connect_flags,
4302                        data->ocd_version, data->ocd_grant);
4303         }
4304
4305         RETURN(0);
4306 }
4307
4308 static int osc_disconnect(struct obd_export *exp)
4309 {
4310         struct obd_device *obd = class_exp2obd(exp);
4311         struct llog_ctxt  *ctxt;
4312         int rc;
4313
4314         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4315         if (ctxt) {
4316                 if (obd->u.cli.cl_conn_count == 1) {
4317                         /* Flush any remaining cancel messages out to the
4318                          * target */
4319                         llog_sync(ctxt, exp);
4320                 }
4321                 llog_ctxt_put(ctxt);
4322         } else {
4323                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4324                        obd);
4325         }
4326
4327         rc = client_disconnect_export(exp);
4328         /**
4329          * Initially we put del_shrink_grant before disconnect_export, but it
4330          * causes the following problem if setup (connect) and cleanup
4331          * (disconnect) are tangled together.
4332          *      connect p1                     disconnect p2
4333          *   ptlrpc_connect_import
4334          *     ...............               class_manual_cleanup
4335          *                                     osc_disconnect
4336          *                                     del_shrink_grant
4337          *   ptlrpc_connect_interrupt
4338          *     init_grant_shrink
4339          *   add this client to shrink list
4340          *                                      cleanup_osc
4341          * Bang! pinger trigger the shrink.
4342          * So the osc should be disconnected from the shrink list, after we
4343          * are sure the import has been destroyed. BUG18662
4344          */
4345         if (obd->u.cli.cl_import == NULL)
4346                 osc_del_shrink_grant(&obd->u.cli);
4347         return rc;
4348 }
4349
4350 static int osc_import_event(struct obd_device *obd,
4351                             struct obd_import *imp,
4352                             enum obd_import_event event)
4353 {
4354         struct client_obd *cli;
4355         int rc = 0;
4356
4357         ENTRY;
4358         LASSERT(imp->imp_obd == obd);
4359
4360         switch (event) {
4361         case IMP_EVENT_DISCON: {
4362                 /* Only do this on the MDS OSC's */
4363                 if (imp->imp_server_timeout) {
4364                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4365
4366                         cfs_spin_lock(&oscc->oscc_lock);
4367                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4368                         cfs_spin_unlock(&oscc->oscc_lock);
4369                 }
4370                 cli = &obd->u.cli;
4371                 client_obd_list_lock(&cli->cl_loi_list_lock);
4372                 cli->cl_avail_grant = 0;
4373                 cli->cl_lost_grant = 0;
4374                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4375                 break;
4376         }
4377         case IMP_EVENT_INACTIVE: {
4378                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4379                 break;
4380         }
4381         case IMP_EVENT_INVALIDATE: {
4382                 struct ldlm_namespace *ns = obd->obd_namespace;
4383                 struct lu_env         *env;
4384                 int                    refcheck;
4385
4386                 env = cl_env_get(&refcheck);
4387                 if (!IS_ERR(env)) {
4388                         /* Reset grants */
4389                         cli = &obd->u.cli;
4390                         client_obd_list_lock(&cli->cl_loi_list_lock);
4391                         /* all pages go to failing rpcs due to the invalid
4392                          * import */
4393                         osc_check_rpcs(env, cli);
4394                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4395
4396                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4397                         cl_env_put(env, &refcheck);
4398                 } else
4399                         rc = PTR_ERR(env);
4400                 break;
4401         }
4402         case IMP_EVENT_ACTIVE: {
4403                 /* Only do this on the MDS OSC's */
4404                 if (imp->imp_server_timeout) {
4405                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4406
4407                         cfs_spin_lock(&oscc->oscc_lock);
4408                         oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4409                                               OSCC_FLAG_NOSPC_BLK);
4410                         cfs_spin_unlock(&oscc->oscc_lock);
4411                 }
4412                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4413                 break;
4414         }
4415         case IMP_EVENT_OCD: {
4416                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4417
4418                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4419                         osc_init_grant(&obd->u.cli, ocd);
4420
4421                 /* See bug 7198 */
4422                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4423                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4424
4425                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4426                 break;
4427         }
4428         case IMP_EVENT_DEACTIVATE: {
4429                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4430                 break;
4431         }
4432         case IMP_EVENT_ACTIVATE: {
4433                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4434                 break;
4435         }
4436         default:
4437                 CERROR("Unknown import event %d\n", event);
4438                 LBUG();
4439         }
4440         RETURN(rc);
4441 }
4442
4443 /**
4444  * Determine whether the lock can be canceled before replaying the lock
4445  * during recovery, see bug16774 for detailed information.
4446  *
4447  * \retval zero the lock can't be canceled
4448  * \retval other ok to cancel
4449  */
4450 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4451 {
4452         check_res_locked(lock->l_resource);
4453
4454         /*
4455          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4456          *
4457          * XXX as a future improvement, we can also cancel unused write lock
4458          * if it doesn't have dirty data and active mmaps.
4459          */
4460         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4461             (lock->l_granted_mode == LCK_PR ||
4462              lock->l_granted_mode == LCK_CR) &&
4463             (osc_dlm_lock_pageref(lock) == 0))
4464                 RETURN(1);
4465
4466         RETURN(0);
4467 }
4468
4469 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4470 {
4471         struct client_obd *cli = &obd->u.cli;
4472         int rc;
4473         ENTRY;
4474
4475         ENTRY;
4476         rc = ptlrpcd_addref();
4477         if (rc)
4478                 RETURN(rc);
4479
4480         rc = client_obd_setup(obd, lcfg);
4481         if (rc == 0) {
4482                 void *handler;
4483                 handler = ptlrpcd_alloc_work(cli->cl_import,
4484                                              brw_queue_work, cli);
4485                 if (!IS_ERR(handler))
4486                         cli->cl_writeback_work = handler;
4487                 else
4488                         rc = PTR_ERR(handler);
4489         }
4490
4491         if (rc == 0) {
4492                 struct lprocfs_static_vars lvars = { 0 };
4493
4494                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4495                 lprocfs_osc_init_vars(&lvars);
4496                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4497                         lproc_osc_attach_seqstat(obd);
4498                         sptlrpc_lprocfs_cliobd_attach(obd);
4499                         ptlrpc_lprocfs_register_obd(obd);
4500                 }
4501
4502                 oscc_init(obd);
4503                 /* We need to allocate a few requests more, because
4504                    brw_interpret tries to create new requests before freeing
4505                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4506                    reserved, but I afraid that might be too much wasted RAM
4507                    in fact, so 2 is just my guess and still should work. */
4508                 cli->cl_import->imp_rq_pool =
4509                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4510                                             OST_MAXREQSIZE,
4511                                             ptlrpc_add_rqs_to_pool);
4512
4513                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4514                 cfs_sema_init(&cli->cl_grant_sem, 1);
4515
4516                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4517         }
4518
4519         if (rc)
4520                 ptlrpcd_decref();
4521         RETURN(rc);
4522 }
4523
4524 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4525 {
4526         int rc = 0;
4527         ENTRY;
4528
4529         switch (stage) {
4530         case OBD_CLEANUP_EARLY: {
4531                 struct obd_import *imp;
4532                 imp = obd->u.cli.cl_import;
4533                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4534                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4535                 ptlrpc_deactivate_import(imp);
4536                 cfs_spin_lock(&imp->imp_lock);
4537                 imp->imp_pingable = 0;
4538                 cfs_spin_unlock(&imp->imp_lock);
4539                 break;
4540         }
4541         case OBD_CLEANUP_EXPORTS: {
4542                 struct client_obd *cli = &obd->u.cli;
4543                 /* LU-464
4544                  * for echo client, export may be on zombie list, wait for
4545                  * zombie thread to cull it, because cli.cl_import will be
4546                  * cleared in client_disconnect_export():
4547                  *   class_export_destroy() -> obd_cleanup() ->
4548                  *   echo_device_free() -> echo_client_cleanup() ->
4549                  *   obd_disconnect() -> osc_disconnect() ->
4550                  *   client_disconnect_export()
4551                  */
4552                 obd_zombie_barrier();
4553                 if (cli->cl_writeback_work) {
4554                         ptlrpcd_destroy_work(cli->cl_writeback_work);
4555                         cli->cl_writeback_work = NULL;
4556                 }
4557                 obd_cleanup_client_import(obd);
4558                 ptlrpc_lprocfs_unregister_obd(obd);
4559                 lprocfs_obd_cleanup(obd);
4560                 rc = obd_llog_finish(obd, 0);
4561                 if (rc != 0)
4562                         CERROR("failed to cleanup llogging subsystems\n");
4563                 break;
4564                 }
4565         }
4566         RETURN(rc);
4567 }
4568
4569 int osc_cleanup(struct obd_device *obd)
4570 {
4571         int rc;
4572
4573         ENTRY;
4574
4575         /* free memory of osc quota cache */
4576         osc_quota_cleanup(obd);
4577
4578         rc = client_obd_cleanup(obd);
4579
4580         ptlrpcd_decref();
4581         RETURN(rc);
4582 }
4583
4584 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4585 {
4586         struct lprocfs_static_vars lvars = { 0 };
4587         int rc = 0;
4588
4589         lprocfs_osc_init_vars(&lvars);
4590
4591         switch (lcfg->lcfg_command) {
4592         default:
4593                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4594                                               lcfg, obd);
4595                 if (rc > 0)
4596                         rc = 0;
4597                 break;
4598         }
4599
4600         return(rc);
4601 }
4602
4603 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4604 {
4605         return osc_process_config_base(obd, buf);
4606 }
4607
4608 struct obd_ops osc_obd_ops = {
4609         .o_owner                = THIS_MODULE,
4610         .o_setup                = osc_setup,
4611         .o_precleanup           = osc_precleanup,
4612         .o_cleanup              = osc_cleanup,
4613         .o_add_conn             = client_import_add_conn,
4614         .o_del_conn             = client_import_del_conn,
4615         .o_connect              = client_connect_import,
4616         .o_reconnect            = osc_reconnect,
4617         .o_disconnect           = osc_disconnect,
4618         .o_statfs               = osc_statfs,
4619         .o_statfs_async         = osc_statfs_async,
4620         .o_packmd               = osc_packmd,
4621         .o_unpackmd             = osc_unpackmd,
4622         .o_precreate            = osc_precreate,
4623         .o_create               = osc_create,
4624         .o_create_async         = osc_create_async,
4625         .o_destroy              = osc_destroy,
4626         .o_getattr              = osc_getattr,
4627         .o_getattr_async        = osc_getattr_async,
4628         .o_setattr              = osc_setattr,
4629         .o_setattr_async        = osc_setattr_async,
4630         .o_brw                  = osc_brw,
4631         .o_punch                = osc_punch,
4632         .o_sync                 = osc_sync,
4633         .o_enqueue              = osc_enqueue,
4634         .o_change_cbdata        = osc_change_cbdata,
4635         .o_find_cbdata          = osc_find_cbdata,
4636         .o_cancel               = osc_cancel,
4637         .o_cancel_unused        = osc_cancel_unused,
4638         .o_iocontrol            = osc_iocontrol,
4639         .o_get_info             = osc_get_info,
4640         .o_set_info_async       = osc_set_info_async,
4641         .o_import_event         = osc_import_event,
4642         .o_llog_init            = osc_llog_init,
4643         .o_llog_finish          = osc_llog_finish,
4644         .o_process_config       = osc_process_config,
4645         .o_quotactl             = osc_quotactl,
4646         .o_quotacheck           = osc_quotacheck,
4647         .o_quota_adjust_qunit   = osc_quota_adjust_qunit,
4648 };
4649
4650 extern struct lu_kmem_descr osc_caches[];
4651 extern cfs_spinlock_t       osc_ast_guard;
4652 extern cfs_lock_class_key_t osc_ast_guard_class;
4653
4654 int __init osc_init(void)
4655 {
4656         struct lprocfs_static_vars lvars = { 0 };
4657         int rc;
4658         ENTRY;
4659
4660         /* print an address of _any_ initialized kernel symbol from this
4661          * module, to allow debugging with gdb that doesn't support data
4662          * symbols from modules.*/
4663         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
4664
4665         rc = lu_kmem_init(osc_caches);
4666
4667         lprocfs_osc_init_vars(&lvars);
4668
4669         osc_quota_init();
4670         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4671                                  LUSTRE_OSC_NAME, &osc_device_type);
4672         if (rc) {
4673                 lu_kmem_fini(osc_caches);
4674                 RETURN(rc);
4675         }
4676
4677         cfs_spin_lock_init(&osc_ast_guard);
4678         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4679
4680         osc_mds_ost_orig_logops = llog_lvfs_ops;
4681         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4682         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4683         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4684         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4685
4686         RETURN(rc);
4687 }
4688
4689 #ifdef __KERNEL__
4690 static void /*__exit*/ osc_exit(void)
4691 {
4692         lu_device_type_fini(&osc_device_type);
4693
4694         osc_quota_exit();
4695         class_unregister_type(LUSTRE_OSC_NAME);
4696         lu_kmem_fini(osc_caches);
4697 }
4698
4699 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4700 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4701 MODULE_LICENSE("GPL");
4702
4703 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4704 #endif