Whamcloud - gitweb
b6948c17c10a3589a5ea98f85211ecee4e5014c7
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #ifndef EXPORT_SYMTAB
41 # define EXPORT_SYMTAB
42 #endif
43 #define DEBUG_SUBSYSTEM S_OSC
44
45 #include <libcfs/libcfs.h>
46
47 #ifndef __KERNEL__
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_dlm.h>
52 #include <lustre_net.h>
53 #include <lustre/lustre_user.h>
54 #include <obd_cksum.h>
55 #include <obd_ost.h>
56 #include <obd_lov.h>
57
58 #ifdef  __CYGWIN__
59 # include <ctype.h>
60 #endif
61
62 #include <lustre_ha.h>
63 #include <lprocfs_status.h>
64 #include <lustre_log.h>
65 #include <lustre_debug.h>
66 #include <lustre_param.h>
67 #include "osc_internal.h"
68
69 static quota_interface_t *quota_interface = NULL;
70 extern quota_interface_t osc_quota_interface;
71
72 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
73 static int brw_interpret(const struct lu_env *env,
74                          struct ptlrpc_request *req, void *data, int rc);
75 int osc_cleanup(struct obd_device *obd);
76
77 /* Pack OSC object metadata for disk storage (LE byte order). */
78 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
79                       struct lov_stripe_md *lsm)
80 {
81         int lmm_size;
82         ENTRY;
83
84         lmm_size = sizeof(**lmmp);
85         if (!lmmp)
86                 RETURN(lmm_size);
87
88         if (*lmmp && !lsm) {
89                 OBD_FREE(*lmmp, lmm_size);
90                 *lmmp = NULL;
91                 RETURN(0);
92         }
93
94         if (!*lmmp) {
95                 OBD_ALLOC(*lmmp, lmm_size);
96                 if (!*lmmp)
97                         RETURN(-ENOMEM);
98         }
99
100         if (lsm) {
101                 LASSERT(lsm->lsm_object_id);
102                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
103                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
104                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
105         }
106
107         RETURN(lmm_size);
108 }
109
110 /* Unpack OSC object metadata from disk storage (LE byte order). */
111 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
112                         struct lov_mds_md *lmm, int lmm_bytes)
113 {
114         int lsm_size;
115         struct obd_import *imp = class_exp2cliimp(exp);
116         ENTRY;
117
118         if (lmm != NULL) {
119                 if (lmm_bytes < sizeof (*lmm)) {
120                         CERROR("lov_mds_md too small: %d, need %d\n",
121                                lmm_bytes, (int)sizeof(*lmm));
122                         RETURN(-EINVAL);
123                 }
124                 /* XXX LOV_MAGIC etc check? */
125
126                 if (lmm->lmm_object_id == 0) {
127                         CERROR("lov_mds_md: zero lmm_object_id\n");
128                         RETURN(-EINVAL);
129                 }
130         }
131
132         lsm_size = lov_stripe_md_size(1);
133         if (lsmp == NULL)
134                 RETURN(lsm_size);
135
136         if (*lsmp != NULL && lmm == NULL) {
137                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 OBD_FREE(*lsmp, lsm_size);
139                 *lsmp = NULL;
140                 RETURN(0);
141         }
142
143         if (*lsmp == NULL) {
144                 OBD_ALLOC(*lsmp, lsm_size);
145                 if (*lsmp == NULL)
146                         RETURN(-ENOMEM);
147                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149                         OBD_FREE(*lsmp, lsm_size);
150                         RETURN(-ENOMEM);
151                 }
152                 loi_init((*lsmp)->lsm_oinfo[0]);
153         }
154
155         if (lmm != NULL) {
156                 /* XXX zero *lsmp? */
157                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
159                 LASSERT((*lsmp)->lsm_object_id);
160                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
161         }
162
163         if (imp != NULL &&
164             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
165                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
166         else
167                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
168
169         RETURN(lsm_size);
170 }
171
172 static inline void osc_pack_capa(struct ptlrpc_request *req,
173                                  struct ost_body *body, void *capa)
174 {
175         struct obd_capa *oc = (struct obd_capa *)capa;
176         struct lustre_capa *c;
177
178         if (!capa)
179                 return;
180
181         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
182         LASSERT(c);
183         capa_cpy(c, oc);
184         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
185         DEBUG_CAPA(D_SEC, c, "pack");
186 }
187
188 static inline void osc_pack_req_body(struct ptlrpc_request *req,
189                                      struct obd_info *oinfo)
190 {
191         struct ost_body *body;
192
193         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
194         LASSERT(body);
195
196         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
197         osc_pack_capa(req, body, oinfo->oi_capa);
198 }
199
200 static inline void osc_set_capa_size(struct ptlrpc_request *req,
201                                      const struct req_msg_field *field,
202                                      struct obd_capa *oc)
203 {
204         if (oc == NULL)
205                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
206         else
207                 /* it is already calculated as sizeof struct obd_capa */
208                 ;
209 }
210
211 static int osc_getattr_interpret(const struct lu_env *env,
212                                  struct ptlrpc_request *req,
213                                  struct osc_async_args *aa, int rc)
214 {
215         struct ost_body *body;
216         ENTRY;
217
218         if (rc != 0)
219                 GOTO(out, rc);
220
221         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
222         if (body) {
223                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
224                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
225
226                 /* This should really be sent by the OST */
227                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
228                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
229         } else {
230                 CDEBUG(D_INFO, "can't unpack ost_body\n");
231                 rc = -EPROTO;
232                 aa->aa_oi->oi_oa->o_valid = 0;
233         }
234 out:
235         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
236         RETURN(rc);
237 }
238
239 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
240                              struct ptlrpc_request_set *set)
241 {
242         struct ptlrpc_request *req;
243         struct osc_async_args *aa;
244         int                    rc;
245         ENTRY;
246
247         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
248         if (req == NULL)
249                 RETURN(-ENOMEM);
250
251         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
252         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
253         if (rc) {
254                 ptlrpc_request_free(req);
255                 RETURN(rc);
256         }
257
258         osc_pack_req_body(req, oinfo);
259
260         ptlrpc_request_set_replen(req);
261         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
262
263         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
264         aa = ptlrpc_req_async_args(req);
265         aa->aa_oi = oinfo;
266
267         ptlrpc_set_add_req(set, req);
268         RETURN(0);
269 }
270
271 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
272 {
273         struct ptlrpc_request *req;
274         struct ost_body       *body;
275         int                    rc;
276         ENTRY;
277
278         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
279         if (req == NULL)
280                 RETURN(-ENOMEM);
281
282         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
283         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
284         if (rc) {
285                 ptlrpc_request_free(req);
286                 RETURN(rc);
287         }
288
289         osc_pack_req_body(req, oinfo);
290
291         ptlrpc_request_set_replen(req);
292
293         rc = ptlrpc_queue_wait(req);
294         if (rc)
295                 GOTO(out, rc);
296
297         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
298         if (body == NULL)
299                 GOTO(out, rc = -EPROTO);
300
301         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
302         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
303
304         /* This should really be sent by the OST */
305         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
306         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
307
308         EXIT;
309  out:
310         ptlrpc_req_finished(req);
311         return rc;
312 }
313
314 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
315                        struct obd_trans_info *oti)
316 {
317         struct ptlrpc_request *req;
318         struct ost_body       *body;
319         int                    rc;
320         ENTRY;
321
322         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
323
324         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
325         if (req == NULL)
326                 RETURN(-ENOMEM);
327
328         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
329         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
330         if (rc) {
331                 ptlrpc_request_free(req);
332                 RETURN(rc);
333         }
334
335         osc_pack_req_body(req, oinfo);
336
337         ptlrpc_request_set_replen(req);
338
339         rc = ptlrpc_queue_wait(req);
340         if (rc)
341                 GOTO(out, rc);
342
343         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
344         if (body == NULL)
345                 GOTO(out, rc = -EPROTO);
346
347         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
348
349         EXIT;
350 out:
351         ptlrpc_req_finished(req);
352         RETURN(rc);
353 }
354
355 static int osc_setattr_interpret(const struct lu_env *env,
356                                  struct ptlrpc_request *req,
357                                  struct osc_setattr_args *sa, int rc)
358 {
359         struct ost_body *body;
360         ENTRY;
361
362         if (rc != 0)
363                 GOTO(out, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out, rc = -EPROTO);
368
369         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
370 out:
371         rc = sa->sa_upcall(sa->sa_cookie, rc);
372         RETURN(rc);
373 }
374
375 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
376                            struct obd_trans_info *oti,
377                            obd_enqueue_update_f upcall, void *cookie,
378                            struct ptlrpc_request_set *rqset)
379 {
380         struct ptlrpc_request   *req;
381         struct osc_setattr_args *sa;
382         int                      rc;
383         ENTRY;
384
385         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
386         if (req == NULL)
387                 RETURN(-ENOMEM);
388
389         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
390         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
391         if (rc) {
392                 ptlrpc_request_free(req);
393                 RETURN(rc);
394         }
395
396         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398
399         osc_pack_req_body(req, oinfo);
400
401         ptlrpc_request_set_replen(req);
402
403         /* do mds to ost setattr asynchronously */
404         if (!rqset) {
405                 /* Do not wait for response. */
406                 ptlrpcd_add_req(req, PSCOPE_OTHER);
407         } else {
408                 req->rq_interpret_reply =
409                         (ptlrpc_interpterer_t)osc_setattr_interpret;
410
411                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
412                 sa = ptlrpc_req_async_args(req);
413                 sa->sa_oa = oinfo->oi_oa;
414                 sa->sa_upcall = upcall;
415                 sa->sa_cookie = cookie;
416
417                 if (rqset == PTLRPCD_SET)
418                         ptlrpcd_add_req(req, PSCOPE_OTHER);
419                 else
420                         ptlrpc_set_add_req(rqset, req);
421         }
422
423         RETURN(0);
424 }
425
426 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
427                              struct obd_trans_info *oti,
428                              struct ptlrpc_request_set *rqset)
429 {
430         return osc_setattr_async_base(exp, oinfo, oti,
431                                       oinfo->oi_cb_up, oinfo, rqset);
432 }
433
434 int osc_real_create(struct obd_export *exp, struct obdo *oa,
435                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
436 {
437         struct ptlrpc_request *req;
438         struct ost_body       *body;
439         struct lov_stripe_md  *lsm;
440         int                    rc;
441         ENTRY;
442
443         LASSERT(oa);
444         LASSERT(ea);
445
446         lsm = *ea;
447         if (!lsm) {
448                 rc = obd_alloc_memmd(exp, &lsm);
449                 if (rc < 0)
450                         RETURN(rc);
451         }
452
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
454         if (req == NULL)
455                 GOTO(out, rc = -ENOMEM);
456
457         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
458         if (rc) {
459                 ptlrpc_request_free(req);
460                 GOTO(out, rc);
461         }
462
463         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
464         LASSERT(body);
465         lustre_set_wire_obdo(&body->oa, oa);
466
467         ptlrpc_request_set_replen(req);
468
469         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
470             oa->o_flags == OBD_FL_DELORPHAN) {
471                 DEBUG_REQ(D_HA, req,
472                           "delorphan from OST integration");
473                 /* Don't resend the delorphan req */
474                 req->rq_no_resend = req->rq_no_delay = 1;
475         }
476
477         rc = ptlrpc_queue_wait(req);
478         if (rc)
479                 GOTO(out_req, rc);
480
481         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
482         if (body == NULL)
483                 GOTO(out_req, rc = -EPROTO);
484
485         lustre_get_wire_obdo(oa, &body->oa);
486
487         /* This should really be sent by the OST */
488         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
489         oa->o_valid |= OBD_MD_FLBLKSZ;
490
491         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
492          * have valid lsm_oinfo data structs, so don't go touching that.
493          * This needs to be fixed in a big way.
494          */
495         lsm->lsm_object_id = oa->o_id;
496         lsm->lsm_object_seq = oa->o_seq;
497         *ea = lsm;
498
499         if (oti != NULL) {
500                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
501
502                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
503                         if (!oti->oti_logcookies)
504                                 oti_alloc_cookies(oti, 1);
505                         *oti->oti_logcookies = oa->o_lcookie;
506                 }
507         }
508
509         CDEBUG(D_HA, "transno: "LPD64"\n",
510                lustre_msg_get_transno(req->rq_repmsg));
511 out_req:
512         ptlrpc_req_finished(req);
513 out:
514         if (rc && !*ea)
515                 obd_free_memmd(exp, &lsm);
516         RETURN(rc);
517 }
518
519 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
520                    obd_enqueue_update_f upcall, void *cookie,
521                    struct ptlrpc_request_set *rqset)
522 {
523         struct ptlrpc_request   *req;
524         struct osc_setattr_args *sa;
525         struct ost_body         *body;
526         int                      rc;
527         ENTRY;
528
529         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
530         if (req == NULL)
531                 RETURN(-ENOMEM);
532
533         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
534         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535         if (rc) {
536                 ptlrpc_request_free(req);
537                 RETURN(rc);
538         }
539         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
540         ptlrpc_at_set_req_timeout(req);
541
542         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543         LASSERT(body);
544         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
545         osc_pack_capa(req, body, oinfo->oi_capa);
546
547         ptlrpc_request_set_replen(req);
548
549
550         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
551         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
552         sa = ptlrpc_req_async_args(req);
553         sa->sa_oa     = oinfo->oi_oa;
554         sa->sa_upcall = upcall;
555         sa->sa_cookie = cookie;
556         if (rqset == PTLRPCD_SET)
557                 ptlrpcd_add_req(req, PSCOPE_OTHER);
558         else
559                 ptlrpc_set_add_req(rqset, req);
560
561         RETURN(0);
562 }
563
564 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
565                      struct obd_trans_info *oti,
566                      struct ptlrpc_request_set *rqset)
567 {
568         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
569         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
570         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
571         return osc_punch_base(exp, oinfo,
572                               oinfo->oi_cb_up, oinfo, rqset);
573 }
574
575 static int osc_sync_interpret(const struct lu_env *env,
576                               struct ptlrpc_request *req,
577                               void *arg, int rc)
578 {
579         struct osc_async_args *aa = arg;
580         struct ost_body *body;
581         ENTRY;
582
583         if (rc)
584                 GOTO(out, rc);
585
586         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
587         if (body == NULL) {
588                 CERROR ("can't unpack ost_body\n");
589                 GOTO(out, rc = -EPROTO);
590         }
591
592         *aa->aa_oi->oi_oa = body->oa;
593 out:
594         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
595         RETURN(rc);
596 }
597
598 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
599                     obd_size start, obd_size end,
600                     struct ptlrpc_request_set *set)
601 {
602         struct ptlrpc_request *req;
603         struct ost_body       *body;
604         struct osc_async_args *aa;
605         int                    rc;
606         ENTRY;
607
608         if (!oinfo->oi_oa) {
609                 CDEBUG(D_INFO, "oa NULL\n");
610                 RETURN(-EINVAL);
611         }
612
613         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
614         if (req == NULL)
615                 RETURN(-ENOMEM);
616
617         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
618         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
619         if (rc) {
620                 ptlrpc_request_free(req);
621                 RETURN(rc);
622         }
623
624         /* overload the size and blocks fields in the oa with start/end */
625         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
626         LASSERT(body);
627         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
628         body->oa.o_size = start;
629         body->oa.o_blocks = end;
630         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
631         osc_pack_capa(req, body, oinfo->oi_capa);
632
633         ptlrpc_request_set_replen(req);
634         req->rq_interpret_reply = osc_sync_interpret;
635
636         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
637         aa = ptlrpc_req_async_args(req);
638         aa->aa_oi = oinfo;
639
640         ptlrpc_set_add_req(set, req);
641         RETURN (0);
642 }
643
644 /* Find and cancel locally locks matched by @mode in the resource found by
645  * @objid. Found locks are added into @cancel list. Returns the amount of
646  * locks added to @cancels list. */
647 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
648                                    cfs_list_t *cancels,
649                                    ldlm_mode_t mode, int lock_flags)
650 {
651         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
652         struct ldlm_res_id res_id;
653         struct ldlm_resource *res;
654         int count;
655         ENTRY;
656
657         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
658         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
659         if (res == NULL)
660                 RETURN(0);
661
662         LDLM_RESOURCE_ADDREF(res);
663         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
664                                            lock_flags, 0, NULL);
665         LDLM_RESOURCE_DELREF(res);
666         ldlm_resource_putref(res);
667         RETURN(count);
668 }
669
670 static int osc_destroy_interpret(const struct lu_env *env,
671                                  struct ptlrpc_request *req, void *data,
672                                  int rc)
673 {
674         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
675
676         cfs_atomic_dec(&cli->cl_destroy_in_flight);
677         cfs_waitq_signal(&cli->cl_destroy_waitq);
678         return 0;
679 }
680
681 static int osc_can_send_destroy(struct client_obd *cli)
682 {
683         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
684             cli->cl_max_rpcs_in_flight) {
685                 /* The destroy request can be sent */
686                 return 1;
687         }
688         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
689             cli->cl_max_rpcs_in_flight) {
690                 /*
691                  * The counter has been modified between the two atomic
692                  * operations.
693                  */
694                 cfs_waitq_signal(&cli->cl_destroy_waitq);
695         }
696         return 0;
697 }
698
699 /* Destroy requests can be async always on the client, and we don't even really
700  * care about the return code since the client cannot do anything at all about
701  * a destroy failure.
702  * When the MDS is unlinking a filename, it saves the file objects into a
703  * recovery llog, and these object records are cancelled when the OST reports
704  * they were destroyed and sync'd to disk (i.e. transaction committed).
705  * If the client dies, or the OST is down when the object should be destroyed,
706  * the records are not cancelled, and when the OST reconnects to the MDS next,
707  * it will retrieve the llog unlink logs and then sends the log cancellation
708  * cookies to the MDS after committing destroy transactions. */
709 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
710                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
711                        struct obd_export *md_export, void *capa)
712 {
713         struct client_obd     *cli = &exp->exp_obd->u.cli;
714         struct ptlrpc_request *req;
715         struct ost_body       *body;
716         CFS_LIST_HEAD(cancels);
717         int rc, count;
718         ENTRY;
719
720         if (!oa) {
721                 CDEBUG(D_INFO, "oa NULL\n");
722                 RETURN(-EINVAL);
723         }
724
725         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
726                                         LDLM_FL_DISCARD_DATA);
727
728         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
729         if (req == NULL) {
730                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
731                 RETURN(-ENOMEM);
732         }
733
734         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
735         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
736                                0, &cancels, count);
737         if (rc) {
738                 ptlrpc_request_free(req);
739                 RETURN(rc);
740         }
741
742         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
743         ptlrpc_at_set_req_timeout(req);
744
745         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
746                 oa->o_lcookie = *oti->oti_logcookies;
747         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
748         LASSERT(body);
749         lustre_set_wire_obdo(&body->oa, oa);
750
751         osc_pack_capa(req, body, (struct obd_capa *)capa);
752         ptlrpc_request_set_replen(req);
753
754         /* don't throttle destroy RPCs for the MDT */
755         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
756                 req->rq_interpret_reply = osc_destroy_interpret;
757                 if (!osc_can_send_destroy(cli)) {
758                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
759                                                           NULL);
760
761                         /*
762                          * Wait until the number of on-going destroy RPCs drops
763                          * under max_rpc_in_flight
764                          */
765                         l_wait_event_exclusive(cli->cl_destroy_waitq,
766                                                osc_can_send_destroy(cli), &lwi);
767                 }
768         }
769
770         /* Do not wait for response */
771         ptlrpcd_add_req(req, PSCOPE_OTHER);
772         RETURN(0);
773 }
774
775 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
776                                 long writing_bytes)
777 {
778         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
779
780         LASSERT(!(oa->o_valid & bits));
781
782         oa->o_valid |= bits;
783         client_obd_list_lock(&cli->cl_loi_list_lock);
784         oa->o_dirty = cli->cl_dirty;
785         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
786                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
787                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
788                 oa->o_undirty = 0;
789         } else if (cfs_atomic_read(&obd_dirty_pages) -
790                    cfs_atomic_read(&obd_dirty_transit_pages) >
791                    obd_max_dirty_pages + 1){
792                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
793                  * not covered by a lock thus they may safely race and trip
794                  * this CERROR() unless we add in a small fudge factor (+1). */
795                 CERROR("dirty %d - %d > system dirty_max %d\n",
796                        cfs_atomic_read(&obd_dirty_pages),
797                        cfs_atomic_read(&obd_dirty_transit_pages),
798                        obd_max_dirty_pages);
799                 oa->o_undirty = 0;
800         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
801                 CERROR("dirty %lu - dirty_max %lu too big???\n",
802                        cli->cl_dirty, cli->cl_dirty_max);
803                 oa->o_undirty = 0;
804         } else {
805                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
806                                 (cli->cl_max_rpcs_in_flight + 1);
807                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
808         }
809         oa->o_grant = cli->cl_avail_grant;
810         oa->o_dropped = cli->cl_lost_grant;
811         cli->cl_lost_grant = 0;
812         client_obd_list_unlock(&cli->cl_loi_list_lock);
813         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
814                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
815
816 }
817
818 static void osc_update_next_shrink(struct client_obd *cli)
819 {
820         cli->cl_next_shrink_grant =
821                 cfs_time_shift(cli->cl_grant_shrink_interval);
822         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
823                cli->cl_next_shrink_grant);
824 }
825
826 /* caller must hold loi_list_lock */
827 static void osc_consume_write_grant(struct client_obd *cli,
828                                     struct brw_page *pga)
829 {
830         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
831         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
832         cfs_atomic_inc(&obd_dirty_pages);
833         cli->cl_dirty += CFS_PAGE_SIZE;
834         cli->cl_avail_grant -= CFS_PAGE_SIZE;
835         pga->flag |= OBD_BRW_FROM_GRANT;
836         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
837                CFS_PAGE_SIZE, pga, pga->pg);
838         LASSERT(cli->cl_avail_grant >= 0);
839         osc_update_next_shrink(cli);
840 }
841
842 /* the companion to osc_consume_write_grant, called when a brw has completed.
843  * must be called with the loi lock held. */
844 static void osc_release_write_grant(struct client_obd *cli,
845                                     struct brw_page *pga, int sent)
846 {
847         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
848         ENTRY;
849
850         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
851         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
852                 EXIT;
853                 return;
854         }
855
856         pga->flag &= ~OBD_BRW_FROM_GRANT;
857         cfs_atomic_dec(&obd_dirty_pages);
858         cli->cl_dirty -= CFS_PAGE_SIZE;
859         if (pga->flag & OBD_BRW_NOCACHE) {
860                 pga->flag &= ~OBD_BRW_NOCACHE;
861                 cfs_atomic_dec(&obd_dirty_transit_pages);
862                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
863         }
864         if (!sent) {
865                 cli->cl_lost_grant += CFS_PAGE_SIZE;
866                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
867                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
868         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
869                 /* For short writes we shouldn't count parts of pages that
870                  * span a whole block on the OST side, or our accounting goes
871                  * wrong.  Should match the code in filter_grant_check. */
872                 int offset = pga->off & ~CFS_PAGE_MASK;
873                 int count = pga->count + (offset & (blocksize - 1));
874                 int end = (offset + pga->count) & (blocksize - 1);
875                 if (end)
876                         count += blocksize - end;
877
878                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
879                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
880                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
881                        cli->cl_avail_grant, cli->cl_dirty);
882         }
883
884         EXIT;
885 }
886
887 static unsigned long rpcs_in_flight(struct client_obd *cli)
888 {
889         return cli->cl_r_in_flight + cli->cl_w_in_flight;
890 }
891
892 /* caller must hold loi_list_lock */
893 void osc_wake_cache_waiters(struct client_obd *cli)
894 {
895         cfs_list_t *l, *tmp;
896         struct osc_cache_waiter *ocw;
897
898         ENTRY;
899         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
900                 /* if we can't dirty more, we must wait until some is written */
901                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
902                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
903                     obd_max_dirty_pages)) {
904                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
905                                "osc max %ld, sys max %d\n", cli->cl_dirty,
906                                cli->cl_dirty_max, obd_max_dirty_pages);
907                         return;
908                 }
909
910                 /* if still dirty cache but no grant wait for pending RPCs that
911                  * may yet return us some grant before doing sync writes */
912                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
913                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
914                                cli->cl_w_in_flight);
915                         return;
916                 }
917
918                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
919                 cfs_list_del_init(&ocw->ocw_entry);
920                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
921                         /* no more RPCs in flight to return grant, do sync IO */
922                         ocw->ocw_rc = -EDQUOT;
923                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
924                 } else {
925                         osc_consume_write_grant(cli,
926                                                 &ocw->ocw_oap->oap_brw_page);
927                 }
928
929                 cfs_waitq_signal(&ocw->ocw_waitq);
930         }
931
932         EXIT;
933 }
934
935 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
936 {
937         client_obd_list_lock(&cli->cl_loi_list_lock);
938         cli->cl_avail_grant += grant;
939         client_obd_list_unlock(&cli->cl_loi_list_lock);
940 }
941
942 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
943 {
944         if (body->oa.o_valid & OBD_MD_FLGRANT) {
945                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
946                 __osc_update_grant(cli, body->oa.o_grant);
947         }
948 }
949
950 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
951                               void *key, obd_count vallen, void *val,
952                               struct ptlrpc_request_set *set);
953
954 static int osc_shrink_grant_interpret(const struct lu_env *env,
955                                       struct ptlrpc_request *req,
956                                       void *aa, int rc)
957 {
958         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
959         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
960         struct ost_body *body;
961
962         if (rc != 0) {
963                 __osc_update_grant(cli, oa->o_grant);
964                 GOTO(out, rc);
965         }
966
967         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
968         LASSERT(body);
969         osc_update_grant(cli, body);
970 out:
971         OBDO_FREE(oa);
972         return rc;
973 }
974
975 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
976 {
977         client_obd_list_lock(&cli->cl_loi_list_lock);
978         oa->o_grant = cli->cl_avail_grant / 4;
979         cli->cl_avail_grant -= oa->o_grant;
980         client_obd_list_unlock(&cli->cl_loi_list_lock);
981         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
982                 oa->o_valid |= OBD_MD_FLFLAGS;
983                 oa->o_flags = 0;
984         }
985         oa->o_flags |= OBD_FL_SHRINK_GRANT;
986         osc_update_next_shrink(cli);
987 }
988
989 /* Shrink the current grant, either from some large amount to enough for a
990  * full set of in-flight RPCs, or if we have already shrunk to that limit
991  * then to enough for a single RPC.  This avoids keeping more grant than
992  * needed, and avoids shrinking the grant piecemeal. */
993 static int osc_shrink_grant(struct client_obd *cli)
994 {
995         long target = (cli->cl_max_rpcs_in_flight + 1) *
996                       cli->cl_max_pages_per_rpc;
997
998         client_obd_list_lock(&cli->cl_loi_list_lock);
999         if (cli->cl_avail_grant <= target)
1000                 target = cli->cl_max_pages_per_rpc;
1001         client_obd_list_unlock(&cli->cl_loi_list_lock);
1002
1003         return osc_shrink_grant_to_target(cli, target);
1004 }
1005
1006 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1007 {
1008         int    rc = 0;
1009         struct ost_body     *body;
1010         ENTRY;
1011
1012         client_obd_list_lock(&cli->cl_loi_list_lock);
1013         /* Don't shrink if we are already above or below the desired limit
1014          * We don't want to shrink below a single RPC, as that will negatively
1015          * impact block allocation and long-term performance. */
1016         if (target < cli->cl_max_pages_per_rpc)
1017                 target = cli->cl_max_pages_per_rpc;
1018
1019         if (target >= cli->cl_avail_grant) {
1020                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1021                 RETURN(0);
1022         }
1023         client_obd_list_unlock(&cli->cl_loi_list_lock);
1024
1025         OBD_ALLOC_PTR(body);
1026         if (!body)
1027                 RETURN(-ENOMEM);
1028
1029         osc_announce_cached(cli, &body->oa, 0);
1030
1031         client_obd_list_lock(&cli->cl_loi_list_lock);
1032         body->oa.o_grant = cli->cl_avail_grant - target;
1033         cli->cl_avail_grant = target;
1034         client_obd_list_unlock(&cli->cl_loi_list_lock);
1035         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1036                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1037                 body->oa.o_flags = 0;
1038         }
1039         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1040         osc_update_next_shrink(cli);
1041
1042         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1043                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1044                                 sizeof(*body), body, NULL);
1045         if (rc != 0)
1046                 __osc_update_grant(cli, body->oa.o_grant);
1047         OBD_FREE_PTR(body);
1048         RETURN(rc);
1049 }
1050
1051 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1052 static int osc_should_shrink_grant(struct client_obd *client)
1053 {
1054         cfs_time_t time = cfs_time_current();
1055         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1056
1057         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1058              OBD_CONNECT_GRANT_SHRINK) == 0)
1059                 return 0;
1060
1061         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1062                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1063                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1064                         return 1;
1065                 else
1066                         osc_update_next_shrink(client);
1067         }
1068         return 0;
1069 }
1070
1071 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1072 {
1073         struct client_obd *client;
1074
1075         cfs_list_for_each_entry(client, &item->ti_obd_list,
1076                                 cl_grant_shrink_list) {
1077                 if (osc_should_shrink_grant(client))
1078                         osc_shrink_grant(client);
1079         }
1080         return 0;
1081 }
1082
1083 static int osc_add_shrink_grant(struct client_obd *client)
1084 {
1085         int rc;
1086
1087         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1088                                        TIMEOUT_GRANT,
1089                                        osc_grant_shrink_grant_cb, NULL,
1090                                        &client->cl_grant_shrink_list);
1091         if (rc) {
1092                 CERROR("add grant client %s error %d\n",
1093                         client->cl_import->imp_obd->obd_name, rc);
1094                 return rc;
1095         }
1096         CDEBUG(D_CACHE, "add grant client %s \n",
1097                client->cl_import->imp_obd->obd_name);
1098         osc_update_next_shrink(client);
1099         return 0;
1100 }
1101
1102 static int osc_del_shrink_grant(struct client_obd *client)
1103 {
1104         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1105                                          TIMEOUT_GRANT);
1106 }
1107
1108 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1109 {
1110         /*
1111          * ocd_grant is the total grant amount we're expect to hold: if we've
1112          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1113          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1114          *
1115          * race is tolerable here: if we're evicted, but imp_state already
1116          * left EVICTED state, then cl_dirty must be 0 already.
1117          */
1118         client_obd_list_lock(&cli->cl_loi_list_lock);
1119         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1120                 cli->cl_avail_grant = ocd->ocd_grant;
1121         else
1122                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1123
1124         if (cli->cl_avail_grant < 0) {
1125                 CWARN("%s: available grant < 0, the OSS is probably not running"
1126                       " with patch from bug20278 (%ld) \n",
1127                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1128                 /* workaround for 1.6 servers which do not have
1129                  * the patch from bug20278 */
1130                 cli->cl_avail_grant = ocd->ocd_grant;
1131         }
1132
1133         client_obd_list_unlock(&cli->cl_loi_list_lock);
1134
1135         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1136                cli->cl_import->imp_obd->obd_name,
1137                cli->cl_avail_grant, cli->cl_lost_grant);
1138
1139         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1140             cfs_list_empty(&cli->cl_grant_shrink_list))
1141                 osc_add_shrink_grant(cli);
1142 }
1143
1144 /* We assume that the reason this OSC got a short read is because it read
1145  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1146  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1147  * this stripe never got written at or beyond this stripe offset yet. */
1148 static void handle_short_read(int nob_read, obd_count page_count,
1149                               struct brw_page **pga)
1150 {
1151         char *ptr;
1152         int i = 0;
1153
1154         /* skip bytes read OK */
1155         while (nob_read > 0) {
1156                 LASSERT (page_count > 0);
1157
1158                 if (pga[i]->count > nob_read) {
1159                         /* EOF inside this page */
1160                         ptr = cfs_kmap(pga[i]->pg) +
1161                                 (pga[i]->off & ~CFS_PAGE_MASK);
1162                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1163                         cfs_kunmap(pga[i]->pg);
1164                         page_count--;
1165                         i++;
1166                         break;
1167                 }
1168
1169                 nob_read -= pga[i]->count;
1170                 page_count--;
1171                 i++;
1172         }
1173
1174         /* zero remaining pages */
1175         while (page_count-- > 0) {
1176                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1177                 memset(ptr, 0, pga[i]->count);
1178                 cfs_kunmap(pga[i]->pg);
1179                 i++;
1180         }
1181 }
1182
1183 static int check_write_rcs(struct ptlrpc_request *req,
1184                            int requested_nob, int niocount,
1185                            obd_count page_count, struct brw_page **pga)
1186 {
1187         int     i;
1188         __u32   *remote_rcs;
1189
1190         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1191                                                   sizeof(*remote_rcs) *
1192                                                   niocount);
1193         if (remote_rcs == NULL) {
1194                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1195                 return(-EPROTO);
1196         }
1197
1198         /* return error if any niobuf was in error */
1199         for (i = 0; i < niocount; i++) {
1200                 if ((int)remote_rcs[i] < 0)
1201                         return(remote_rcs[i]);
1202
1203                 if (remote_rcs[i] != 0) {
1204                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1205                                 i, remote_rcs[i], req);
1206                         return(-EPROTO);
1207                 }
1208         }
1209
1210         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1211                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1212                        req->rq_bulk->bd_nob_transferred, requested_nob);
1213                 return(-EPROTO);
1214         }
1215
1216         return (0);
1217 }
1218
1219 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1220 {
1221         if (p1->flag != p2->flag) {
1222                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1223                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1224
1225                 /* warn if we try to combine flags that we don't know to be
1226                  * safe to combine */
1227                 if ((p1->flag & mask) != (p2->flag & mask))
1228                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1229                                "same brw?\n", p1->flag, p2->flag);
1230                 return 0;
1231         }
1232
1233         return (p1->off + p1->count == p2->off);
1234 }
1235
1236 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1237                                    struct brw_page **pga, int opc,
1238                                    cksum_type_t cksum_type)
1239 {
1240         __u32 cksum;
1241         int i = 0;
1242
1243         LASSERT (pg_count > 0);
1244         cksum = init_checksum(cksum_type);
1245         while (nob > 0 && pg_count > 0) {
1246                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1247                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1248                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1249
1250                 /* corrupt the data before we compute the checksum, to
1251                  * simulate an OST->client data error */
1252                 if (i == 0 && opc == OST_READ &&
1253                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1254                         memcpy(ptr + off, "bad1", min(4, nob));
1255                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1256                 cfs_kunmap(pga[i]->pg);
1257                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1258                                off, cksum);
1259
1260                 nob -= pga[i]->count;
1261                 pg_count--;
1262                 i++;
1263         }
1264         /* For sending we only compute the wrong checksum instead
1265          * of corrupting the data so it is still correct on a redo */
1266         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1267                 cksum++;
1268
1269         return cksum;
1270 }
1271
1272 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1273                                 struct lov_stripe_md *lsm, obd_count page_count,
1274                                 struct brw_page **pga,
1275                                 struct ptlrpc_request **reqp,
1276                                 struct obd_capa *ocapa, int reserve,
1277                                 int resend)
1278 {
1279         struct ptlrpc_request   *req;
1280         struct ptlrpc_bulk_desc *desc;
1281         struct ost_body         *body;
1282         struct obd_ioobj        *ioobj;
1283         struct niobuf_remote    *niobuf;
1284         int niocount, i, requested_nob, opc, rc;
1285         struct osc_brw_async_args *aa;
1286         struct req_capsule      *pill;
1287         struct brw_page *pg_prev;
1288
1289         ENTRY;
1290         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1291                 RETURN(-ENOMEM); /* Recoverable */
1292         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1293                 RETURN(-EINVAL); /* Fatal */
1294
1295         if ((cmd & OBD_BRW_WRITE) != 0) {
1296                 opc = OST_WRITE;
1297                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1298                                                 cli->cl_import->imp_rq_pool,
1299                                                 &RQF_OST_BRW_WRITE);
1300         } else {
1301                 opc = OST_READ;
1302                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1303         }
1304         if (req == NULL)
1305                 RETURN(-ENOMEM);
1306
1307         for (niocount = i = 1; i < page_count; i++) {
1308                 if (!can_merge_pages(pga[i - 1], pga[i]))
1309                         niocount++;
1310         }
1311
1312         pill = &req->rq_pill;
1313         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1314                              sizeof(*ioobj));
1315         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1316                              niocount * sizeof(*niobuf));
1317         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1318
1319         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1320         if (rc) {
1321                 ptlrpc_request_free(req);
1322                 RETURN(rc);
1323         }
1324         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1325         ptlrpc_at_set_req_timeout(req);
1326
1327         if (opc == OST_WRITE)
1328                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1329                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1330         else
1331                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1332                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1333
1334         if (desc == NULL)
1335                 GOTO(out, rc = -ENOMEM);
1336         /* NB request now owns desc and will free it when it gets freed */
1337
1338         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1339         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1340         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1341         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1342
1343         lustre_set_wire_obdo(&body->oa, oa);
1344
1345         obdo_to_ioobj(oa, ioobj);
1346         ioobj->ioo_bufcnt = niocount;
1347         osc_pack_capa(req, body, ocapa);
1348         LASSERT (page_count > 0);
1349         pg_prev = pga[0];
1350         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1351                 struct brw_page *pg = pga[i];
1352                 int poff = pg->off & ~CFS_PAGE_MASK;
1353
1354                 LASSERT(pg->count > 0);
1355                 /* make sure there is no gap in the middle of page array */
1356                 LASSERTF(page_count == 1 ||
1357                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1358                           ergo(i > 0 && i < page_count - 1,
1359                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1360                           ergo(i == page_count - 1, poff == 0)),
1361                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1362                          i, page_count, pg, pg->off, pg->count);
1363 #ifdef __linux__
1364                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1365                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1366                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1367                          i, page_count,
1368                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1369                          pg_prev->pg, page_private(pg_prev->pg),
1370                          pg_prev->pg->index, pg_prev->off);
1371 #else
1372                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1373                          "i %d p_c %u\n", i, page_count);
1374 #endif
1375                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1376                         (pg->flag & OBD_BRW_SRVLOCK));
1377
1378                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1379                 requested_nob += pg->count;
1380
1381                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1382                         niobuf--;
1383                         niobuf->len += pg->count;
1384                 } else {
1385                         niobuf->offset = pg->off;
1386                         niobuf->len    = pg->count;
1387                         niobuf->flags  = pg->flag;
1388                 }
1389                 pg_prev = pg;
1390         }
1391
1392         LASSERTF((void *)(niobuf - niocount) ==
1393                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1394                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1395                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1396
1397         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1398         if (resend) {
1399                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1400                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1401                         body->oa.o_flags = 0;
1402                 }
1403                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1404         }
1405
1406         if (osc_should_shrink_grant(cli))
1407                 osc_shrink_grant_local(cli, &body->oa);
1408
1409         /* size[REQ_REC_OFF] still sizeof (*body) */
1410         if (opc == OST_WRITE) {
1411                 if (unlikely(cli->cl_checksum) &&
1412                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1413                         /* store cl_cksum_type in a local variable since
1414                          * it can be changed via lprocfs */
1415                         cksum_type_t cksum_type = cli->cl_cksum_type;
1416
1417                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1418                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1419                                 body->oa.o_flags = 0;
1420                         }
1421                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1422                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1423                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1424                                                              page_count, pga,
1425                                                              OST_WRITE,
1426                                                              cksum_type);
1427                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1428                                body->oa.o_cksum);
1429                         /* save this in 'oa', too, for later checking */
1430                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1431                         oa->o_flags |= cksum_type_pack(cksum_type);
1432                 } else {
1433                         /* clear out the checksum flag, in case this is a
1434                          * resend but cl_checksum is no longer set. b=11238 */
1435                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1436                 }
1437                 oa->o_cksum = body->oa.o_cksum;
1438                 /* 1 RC per niobuf */
1439                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1440                                      sizeof(__u32) * niocount);
1441         } else {
1442                 if (unlikely(cli->cl_checksum) &&
1443                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1444                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1445                                 body->oa.o_flags = 0;
1446                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1447                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1448                 }
1449         }
1450         ptlrpc_request_set_replen(req);
1451
1452         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1453         aa = ptlrpc_req_async_args(req);
1454         aa->aa_oa = oa;
1455         aa->aa_requested_nob = requested_nob;
1456         aa->aa_nio_count = niocount;
1457         aa->aa_page_count = page_count;
1458         aa->aa_resends = 0;
1459         aa->aa_ppga = pga;
1460         aa->aa_cli = cli;
1461         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1462         if (ocapa && reserve)
1463                 aa->aa_ocapa = capa_get(ocapa);
1464
1465         *reqp = req;
1466         RETURN(0);
1467
1468  out:
1469         ptlrpc_req_finished(req);
1470         RETURN(rc);
1471 }
1472
1473 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1474                                 __u32 client_cksum, __u32 server_cksum, int nob,
1475                                 obd_count page_count, struct brw_page **pga,
1476                                 cksum_type_t client_cksum_type)
1477 {
1478         __u32 new_cksum;
1479         char *msg;
1480         cksum_type_t cksum_type;
1481
1482         if (server_cksum == client_cksum) {
1483                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1484                 return 0;
1485         }
1486
1487         /* If this is mmaped file - it can be changed at any time */
1488         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1489                 return 1;
1490
1491         if (oa->o_valid & OBD_MD_FLFLAGS)
1492                 cksum_type = cksum_type_unpack(oa->o_flags);
1493         else
1494                 cksum_type = OBD_CKSUM_CRC32;
1495
1496         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1497                                       cksum_type);
1498
1499         if (cksum_type != client_cksum_type)
1500                 msg = "the server did not use the checksum type specified in "
1501                       "the original request - likely a protocol problem";
1502         else if (new_cksum == server_cksum)
1503                 msg = "changed on the client after we checksummed it - "
1504                       "likely false positive due to mmap IO (bug 11742)";
1505         else if (new_cksum == client_cksum)
1506                 msg = "changed in transit before arrival at OST";
1507         else
1508                 msg = "changed in transit AND doesn't match the original - "
1509                       "likely false positive due to mmap IO (bug 11742)";
1510
1511         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1512                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1513                            msg, libcfs_nid2str(peer->nid),
1514                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1515                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1516                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1517                            oa->o_id,
1518                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1519                            pga[0]->off,
1520                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1521         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1522                "client csum now %x\n", client_cksum, client_cksum_type,
1523                server_cksum, cksum_type, new_cksum);
1524         return 1;
1525 }
1526
1527 /* Note rc enters this function as number of bytes transferred */
1528 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1529 {
1530         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1531         const lnet_process_id_t *peer =
1532                         &req->rq_import->imp_connection->c_peer;
1533         struct client_obd *cli = aa->aa_cli;
1534         struct ost_body *body;
1535         __u32 client_cksum = 0;
1536         ENTRY;
1537
1538         if (rc < 0 && rc != -EDQUOT) {
1539                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1540                 RETURN(rc);
1541         }
1542
1543         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1544         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1545         if (body == NULL) {
1546                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1547                 RETURN(-EPROTO);
1548         }
1549
1550 #ifdef HAVE_QUOTA_SUPPORT
1551         /* set/clear over quota flag for a uid/gid */
1552         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1553             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1554                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1555
1556                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1557                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1558                        body->oa.o_flags);
1559                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1560                              body->oa.o_flags);
1561         }
1562 #endif
1563
1564         osc_update_grant(cli, body);
1565
1566         if (rc < 0)
1567                 RETURN(rc);
1568
1569         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1570                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1571
1572         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1573                 if (rc > 0) {
1574                         CERROR("Unexpected +ve rc %d\n", rc);
1575                         RETURN(-EPROTO);
1576                 }
1577                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1578
1579                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1580                         RETURN(-EAGAIN);
1581
1582                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1583                     check_write_checksum(&body->oa, peer, client_cksum,
1584                                          body->oa.o_cksum, aa->aa_requested_nob,
1585                                          aa->aa_page_count, aa->aa_ppga,
1586                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1587                         RETURN(-EAGAIN);
1588
1589                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1590                                      aa->aa_page_count, aa->aa_ppga);
1591                 GOTO(out, rc);
1592         }
1593
1594         /* The rest of this function executes only for OST_READs */
1595
1596         /* if unwrap_bulk failed, return -EAGAIN to retry */
1597         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1598         if (rc < 0)
1599                 GOTO(out, rc = -EAGAIN);
1600
1601         if (rc > aa->aa_requested_nob) {
1602                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1603                        aa->aa_requested_nob);
1604                 RETURN(-EPROTO);
1605         }
1606
1607         if (rc != req->rq_bulk->bd_nob_transferred) {
1608                 CERROR ("Unexpected rc %d (%d transferred)\n",
1609                         rc, req->rq_bulk->bd_nob_transferred);
1610                 return (-EPROTO);
1611         }
1612
1613         if (rc < aa->aa_requested_nob)
1614                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1615
1616         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1617                 static int cksum_counter;
1618                 __u32      server_cksum = body->oa.o_cksum;
1619                 char      *via;
1620                 char      *router;
1621                 cksum_type_t cksum_type;
1622
1623                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1624                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1625                 else
1626                         cksum_type = OBD_CKSUM_CRC32;
1627                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1628                                                  aa->aa_ppga, OST_READ,
1629                                                  cksum_type);
1630
1631                 if (peer->nid == req->rq_bulk->bd_sender) {
1632                         via = router = "";
1633                 } else {
1634                         via = " via ";
1635                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1636                 }
1637
1638                 if (server_cksum == ~0 && rc > 0) {
1639                         CERROR("Protocol error: server %s set the 'checksum' "
1640                                "bit, but didn't send a checksum.  Not fatal, "
1641                                "but please notify on http://bugs.whamcloud.com/\n",
1642                                libcfs_nid2str(peer->nid));
1643                 } else if (server_cksum != client_cksum) {
1644                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1645                                            "%s%s%s inode "DFID" object "
1646                                            LPU64"/"LPU64" extent "
1647                                            "["LPU64"-"LPU64"]\n",
1648                                            req->rq_import->imp_obd->obd_name,
1649                                            libcfs_nid2str(peer->nid),
1650                                            via, router,
1651                                            body->oa.o_valid & OBD_MD_FLFID ?
1652                                                 body->oa.o_parent_seq : (__u64)0,
1653                                            body->oa.o_valid & OBD_MD_FLFID ?
1654                                                 body->oa.o_parent_oid : 0,
1655                                            body->oa.o_valid & OBD_MD_FLFID ?
1656                                                 body->oa.o_parent_ver : 0,
1657                                            body->oa.o_id,
1658                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1659                                                 body->oa.o_seq : (__u64)0,
1660                                            aa->aa_ppga[0]->off,
1661                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1662                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1663                                                                         1);
1664                         CERROR("client %x, server %x, cksum_type %x\n",
1665                                client_cksum, server_cksum, cksum_type);
1666                         cksum_counter = 0;
1667                         aa->aa_oa->o_cksum = client_cksum;
1668                         rc = -EAGAIN;
1669                 } else {
1670                         cksum_counter++;
1671                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1672                         rc = 0;
1673                 }
1674         } else if (unlikely(client_cksum)) {
1675                 static int cksum_missed;
1676
1677                 cksum_missed++;
1678                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1679                         CERROR("Checksum %u requested from %s but not sent\n",
1680                                cksum_missed, libcfs_nid2str(peer->nid));
1681         } else {
1682                 rc = 0;
1683         }
1684 out:
1685         if (rc >= 0)
1686                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1687
1688         RETURN(rc);
1689 }
1690
1691 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1692                             struct lov_stripe_md *lsm,
1693                             obd_count page_count, struct brw_page **pga,
1694                             struct obd_capa *ocapa)
1695 {
1696         struct ptlrpc_request *req;
1697         int                    rc;
1698         cfs_waitq_t            waitq;
1699         int                    resends = 0;
1700         struct l_wait_info     lwi;
1701
1702         ENTRY;
1703
1704         cfs_waitq_init(&waitq);
1705
1706 restart_bulk:
1707         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1708                                   page_count, pga, &req, ocapa, 0, resends);
1709         if (rc != 0)
1710                 return (rc);
1711
1712         rc = ptlrpc_queue_wait(req);
1713
1714         if (rc == -ETIMEDOUT && req->rq_resend) {
1715                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1716                 ptlrpc_req_finished(req);
1717                 goto restart_bulk;
1718         }
1719
1720         rc = osc_brw_fini_request(req, rc);
1721
1722         ptlrpc_req_finished(req);
1723         if (osc_recoverable_error(rc)) {
1724                 resends++;
1725                 if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1726                         CERROR("too many resend retries, returning error\n");
1727                         RETURN(-EIO);
1728                 }
1729
1730                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1731                 l_wait_event(waitq, 0, &lwi);
1732
1733                 goto restart_bulk;
1734         }
1735
1736         RETURN (rc);
1737 }
1738
1739 int osc_brw_redo_request(struct ptlrpc_request *request,
1740                          struct osc_brw_async_args *aa)
1741 {
1742         struct ptlrpc_request *new_req;
1743         struct ptlrpc_request_set *set = request->rq_set;
1744         struct osc_brw_async_args *new_aa;
1745         struct osc_async_page *oap;
1746         int rc = 0;
1747         ENTRY;
1748
1749         if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
1750                 CERROR("too many resent retries, returning error\n");
1751                 RETURN(-EIO);
1752         }
1753
1754         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1755
1756         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1757                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1758                                   aa->aa_cli, aa->aa_oa,
1759                                   NULL /* lsm unused by osc currently */,
1760                                   aa->aa_page_count, aa->aa_ppga,
1761                                   &new_req, aa->aa_ocapa, 0, 1);
1762         if (rc)
1763                 RETURN(rc);
1764
1765         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1766
1767         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1768                 if (oap->oap_request != NULL) {
1769                         LASSERTF(request == oap->oap_request,
1770                                  "request %p != oap_request %p\n",
1771                                  request, oap->oap_request);
1772                         if (oap->oap_interrupted) {
1773                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1774                                 ptlrpc_req_finished(new_req);
1775                                 RETURN(-EINTR);
1776                         }
1777                 }
1778         }
1779         /* New request takes over pga and oaps from old request.
1780          * Note that copying a list_head doesn't work, need to move it... */
1781         aa->aa_resends++;
1782         new_req->rq_interpret_reply = request->rq_interpret_reply;
1783         new_req->rq_async_args = request->rq_async_args;
1784         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1785
1786         new_aa = ptlrpc_req_async_args(new_req);
1787
1788         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1789         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1790         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1791
1792         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1793                 if (oap->oap_request) {
1794                         ptlrpc_req_finished(oap->oap_request);
1795                         oap->oap_request = ptlrpc_request_addref(new_req);
1796                 }
1797         }
1798
1799         new_aa->aa_ocapa = aa->aa_ocapa;
1800         aa->aa_ocapa = NULL;
1801
1802         /* use ptlrpc_set_add_req is safe because interpret functions work
1803          * in check_set context. only one way exist with access to request
1804          * from different thread got -EINTR - this way protected with
1805          * cl_loi_list_lock */
1806         ptlrpc_set_add_req(set, new_req);
1807
1808         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1809
1810         DEBUG_REQ(D_INFO, new_req, "new request");
1811         RETURN(0);
1812 }
1813
1814 /*
1815  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1816  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1817  * fine for our small page arrays and doesn't require allocation.  its an
1818  * insertion sort that swaps elements that are strides apart, shrinking the
1819  * stride down until its '1' and the array is sorted.
1820  */
1821 static void sort_brw_pages(struct brw_page **array, int num)
1822 {
1823         int stride, i, j;
1824         struct brw_page *tmp;
1825
1826         if (num == 1)
1827                 return;
1828         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1829                 ;
1830
1831         do {
1832                 stride /= 3;
1833                 for (i = stride ; i < num ; i++) {
1834                         tmp = array[i];
1835                         j = i;
1836                         while (j >= stride && array[j - stride]->off > tmp->off) {
1837                                 array[j] = array[j - stride];
1838                                 j -= stride;
1839                         }
1840                         array[j] = tmp;
1841                 }
1842         } while (stride > 1);
1843 }
1844
1845 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1846 {
1847         int count = 1;
1848         int offset;
1849         int i = 0;
1850
1851         LASSERT (pages > 0);
1852         offset = pg[i]->off & ~CFS_PAGE_MASK;
1853
1854         for (;;) {
1855                 pages--;
1856                 if (pages == 0)         /* that's all */
1857                         return count;
1858
1859                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1860                         return count;   /* doesn't end on page boundary */
1861
1862                 i++;
1863                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1864                 if (offset != 0)        /* doesn't start on page boundary */
1865                         return count;
1866
1867                 count++;
1868         }
1869 }
1870
1871 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1872 {
1873         struct brw_page **ppga;
1874         int i;
1875
1876         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1877         if (ppga == NULL)
1878                 return NULL;
1879
1880         for (i = 0; i < count; i++)
1881                 ppga[i] = pga + i;
1882         return ppga;
1883 }
1884
1885 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1886 {
1887         LASSERT(ppga != NULL);
1888         OBD_FREE(ppga, sizeof(*ppga) * count);
1889 }
1890
1891 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1892                    obd_count page_count, struct brw_page *pga,
1893                    struct obd_trans_info *oti)
1894 {
1895         struct obdo *saved_oa = NULL;
1896         struct brw_page **ppga, **orig;
1897         struct obd_import *imp = class_exp2cliimp(exp);
1898         struct client_obd *cli;
1899         int rc, page_count_orig;
1900         ENTRY;
1901
1902         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1903         cli = &imp->imp_obd->u.cli;
1904
1905         if (cmd & OBD_BRW_CHECK) {
1906                 /* The caller just wants to know if there's a chance that this
1907                  * I/O can succeed */
1908
1909                 if (imp->imp_invalid)
1910                         RETURN(-EIO);
1911                 RETURN(0);
1912         }
1913
1914         /* test_brw with a failed create can trip this, maybe others. */
1915         LASSERT(cli->cl_max_pages_per_rpc);
1916
1917         rc = 0;
1918
1919         orig = ppga = osc_build_ppga(pga, page_count);
1920         if (ppga == NULL)
1921                 RETURN(-ENOMEM);
1922         page_count_orig = page_count;
1923
1924         sort_brw_pages(ppga, page_count);
1925         while (page_count) {
1926                 obd_count pages_per_brw;
1927
1928                 if (page_count > cli->cl_max_pages_per_rpc)
1929                         pages_per_brw = cli->cl_max_pages_per_rpc;
1930                 else
1931                         pages_per_brw = page_count;
1932
1933                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1934
1935                 if (saved_oa != NULL) {
1936                         /* restore previously saved oa */
1937                         *oinfo->oi_oa = *saved_oa;
1938                 } else if (page_count > pages_per_brw) {
1939                         /* save a copy of oa (brw will clobber it) */
1940                         OBDO_ALLOC(saved_oa);
1941                         if (saved_oa == NULL)
1942                                 GOTO(out, rc = -ENOMEM);
1943                         *saved_oa = *oinfo->oi_oa;
1944                 }
1945
1946                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1947                                       pages_per_brw, ppga, oinfo->oi_capa);
1948
1949                 if (rc != 0)
1950                         break;
1951
1952                 page_count -= pages_per_brw;
1953                 ppga += pages_per_brw;
1954         }
1955
1956 out:
1957         osc_release_ppga(orig, page_count_orig);
1958
1959         if (saved_oa != NULL)
1960                 OBDO_FREE(saved_oa);
1961
1962         RETURN(rc);
1963 }
1964
1965 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1966  * the dirty accounting.  Writeback completes or truncate happens before
1967  * writing starts.  Must be called with the loi lock held. */
1968 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1969                            int sent)
1970 {
1971         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1972 }
1973
1974
1975 /* This maintains the lists of pending pages to read/write for a given object
1976  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1977  * to quickly find objects that are ready to send an RPC. */
1978 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1979                          int cmd)
1980 {
1981         int optimal;
1982         ENTRY;
1983
1984         if (lop->lop_num_pending == 0)
1985                 RETURN(0);
1986
1987         /* if we have an invalid import we want to drain the queued pages
1988          * by forcing them through rpcs that immediately fail and complete
1989          * the pages.  recovery relies on this to empty the queued pages
1990          * before canceling the locks and evicting down the llite pages */
1991         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1992                 RETURN(1);
1993
1994         /* stream rpcs in queue order as long as as there is an urgent page
1995          * queued.  this is our cheap solution for good batching in the case
1996          * where writepage marks some random page in the middle of the file
1997          * as urgent because of, say, memory pressure */
1998         if (!cfs_list_empty(&lop->lop_urgent)) {
1999                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2000                 RETURN(1);
2001         }
2002         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
2003         optimal = cli->cl_max_pages_per_rpc;
2004         if (cmd & OBD_BRW_WRITE) {
2005                 /* trigger a write rpc stream as long as there are dirtiers
2006                  * waiting for space.  as they're waiting, they're not going to
2007                  * create more pages to coalesce with what's waiting.. */
2008                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2009                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2010                         RETURN(1);
2011                 }
2012                 /* +16 to avoid triggering rpcs that would want to include pages
2013                  * that are being queued but which can't be made ready until
2014                  * the queuer finishes with the page. this is a wart for
2015                  * llite::commit_write() */
2016                 optimal += 16;
2017         }
2018         if (lop->lop_num_pending >= optimal)
2019                 RETURN(1);
2020
2021         RETURN(0);
2022 }
2023
2024 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2025 {
2026         struct osc_async_page *oap;
2027         ENTRY;
2028
2029         if (cfs_list_empty(&lop->lop_urgent))
2030                 RETURN(0);
2031
2032         oap = cfs_list_entry(lop->lop_urgent.next,
2033                          struct osc_async_page, oap_urgent_item);
2034
2035         if (oap->oap_async_flags & ASYNC_HP) {
2036                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2037                 RETURN(1);
2038         }
2039
2040         RETURN(0);
2041 }
2042
2043 static void on_list(cfs_list_t *item, cfs_list_t *list,
2044                     int should_be_on)
2045 {
2046         if (cfs_list_empty(item) && should_be_on)
2047                 cfs_list_add_tail(item, list);
2048         else if (!cfs_list_empty(item) && !should_be_on)
2049                 cfs_list_del_init(item);
2050 }
2051
2052 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2053  * can find pages to build into rpcs quickly */
2054 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2055 {
2056         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2057             lop_makes_hprpc(&loi->loi_read_lop)) {
2058                 /* HP rpc */
2059                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2060                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2061         } else {
2062                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2063                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2064                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2065                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2066         }
2067
2068         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2069                 loi->loi_write_lop.lop_num_pending);
2070
2071         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2072                 loi->loi_read_lop.lop_num_pending);
2073 }
2074
2075 static void lop_update_pending(struct client_obd *cli,
2076                                struct loi_oap_pages *lop, int cmd, int delta)
2077 {
2078         lop->lop_num_pending += delta;
2079         if (cmd & OBD_BRW_WRITE)
2080                 cli->cl_pending_w_pages += delta;
2081         else
2082                 cli->cl_pending_r_pages += delta;
2083 }
2084
2085 /**
2086  * this is called when a sync waiter receives an interruption.  Its job is to
2087  * get the caller woken as soon as possible.  If its page hasn't been put in an
2088  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2089  * desiring interruption which will forcefully complete the rpc once the rpc
2090  * has timed out.
2091  */
2092 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2093 {
2094         struct loi_oap_pages *lop;
2095         struct lov_oinfo *loi;
2096         int rc = -EBUSY;
2097         ENTRY;
2098
2099         LASSERT(!oap->oap_interrupted);
2100         oap->oap_interrupted = 1;
2101
2102         /* ok, it's been put in an rpc. only one oap gets a request reference */
2103         if (oap->oap_request != NULL) {
2104                 ptlrpc_mark_interrupted(oap->oap_request);
2105                 ptlrpcd_wake(oap->oap_request);
2106                 ptlrpc_req_finished(oap->oap_request);
2107                 oap->oap_request = NULL;
2108         }
2109
2110         /*
2111          * page completion may be called only if ->cpo_prep() method was
2112          * executed by osc_io_submit(), that also adds page the to pending list
2113          */
2114         if (!cfs_list_empty(&oap->oap_pending_item)) {
2115                 cfs_list_del_init(&oap->oap_pending_item);
2116                 cfs_list_del_init(&oap->oap_urgent_item);
2117
2118                 loi = oap->oap_loi;
2119                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2120                         &loi->loi_write_lop : &loi->loi_read_lop;
2121                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2122                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2123                 rc = oap->oap_caller_ops->ap_completion(env,
2124                                           oap->oap_caller_data,
2125                                           oap->oap_cmd, NULL, -EINTR);
2126         }
2127
2128         RETURN(rc);
2129 }
2130
2131 /* this is trying to propogate async writeback errors back up to the
2132  * application.  As an async write fails we record the error code for later if
2133  * the app does an fsync.  As long as errors persist we force future rpcs to be
2134  * sync so that the app can get a sync error and break the cycle of queueing
2135  * pages for which writeback will fail. */
2136 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2137                            int rc)
2138 {
2139         if (rc) {
2140                 if (!ar->ar_rc)
2141                         ar->ar_rc = rc;
2142
2143                 ar->ar_force_sync = 1;
2144                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2145                 return;
2146
2147         }
2148
2149         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2150                 ar->ar_force_sync = 0;
2151 }
2152
2153 void osc_oap_to_pending(struct osc_async_page *oap)
2154 {
2155         struct loi_oap_pages *lop;
2156
2157         if (oap->oap_cmd & OBD_BRW_WRITE)
2158                 lop = &oap->oap_loi->loi_write_lop;
2159         else
2160                 lop = &oap->oap_loi->loi_read_lop;
2161
2162         if (oap->oap_async_flags & ASYNC_HP)
2163                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2164         else if (oap->oap_async_flags & ASYNC_URGENT)
2165                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2166         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2167         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2168 }
2169
2170 /* this must be called holding the loi list lock to give coverage to exit_cache,
2171  * async_flag maintenance, and oap_request */
2172 static void osc_ap_completion(const struct lu_env *env,
2173                               struct client_obd *cli, struct obdo *oa,
2174                               struct osc_async_page *oap, int sent, int rc)
2175 {
2176         __u64 xid = 0;
2177
2178         ENTRY;
2179         if (oap->oap_request != NULL) {
2180                 xid = ptlrpc_req_xid(oap->oap_request);
2181                 ptlrpc_req_finished(oap->oap_request);
2182                 oap->oap_request = NULL;
2183         }
2184
2185         cfs_spin_lock(&oap->oap_lock);
2186         oap->oap_async_flags = 0;
2187         cfs_spin_unlock(&oap->oap_lock);
2188         oap->oap_interrupted = 0;
2189
2190         if (oap->oap_cmd & OBD_BRW_WRITE) {
2191                 osc_process_ar(&cli->cl_ar, xid, rc);
2192                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2193         }
2194
2195         if (rc == 0 && oa != NULL) {
2196                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2197                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2198                 if (oa->o_valid & OBD_MD_FLMTIME)
2199                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2200                 if (oa->o_valid & OBD_MD_FLATIME)
2201                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2202                 if (oa->o_valid & OBD_MD_FLCTIME)
2203                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2204         }
2205
2206         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2207                                                 oap->oap_cmd, oa, rc);
2208
2209         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2210          * I/O on the page could start, but OSC calls it under lock
2211          * and thus we can add oap back to pending safely */
2212         if (rc)
2213                 /* upper layer wants to leave the page on pending queue */
2214                 osc_oap_to_pending(oap);
2215         else
2216                 osc_exit_cache(cli, oap, sent);
2217         EXIT;
2218 }
2219
2220 static int brw_interpret(const struct lu_env *env,
2221                          struct ptlrpc_request *req, void *data, int rc)
2222 {
2223         struct osc_brw_async_args *aa = data;
2224         struct client_obd *cli;
2225         int async;
2226         ENTRY;
2227
2228         rc = osc_brw_fini_request(req, rc);
2229         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2230         if (osc_recoverable_error(rc)) {
2231                 /* Only retry once for mmaped files since the mmaped page
2232                  * might be modified at anytime. We have to retry at least
2233                  * once in case there WAS really a corruption of the page
2234                  * on the network, that was not caused by mmap() modifying
2235                  * the page. Bug11742 */
2236                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2237                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2238                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2239                         rc = 0;
2240                 } else {
2241                         rc = osc_brw_redo_request(req, aa);
2242                         if (rc == 0)
2243                                 RETURN(0);
2244                 }
2245         }
2246
2247         if (aa->aa_ocapa) {
2248                 capa_put(aa->aa_ocapa);
2249                 aa->aa_ocapa = NULL;
2250         }
2251
2252         cli = aa->aa_cli;
2253
2254         client_obd_list_lock(&cli->cl_loi_list_lock);
2255
2256         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2257          * is called so we know whether to go to sync BRWs or wait for more
2258          * RPCs to complete */
2259         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2260                 cli->cl_w_in_flight--;
2261         else
2262                 cli->cl_r_in_flight--;
2263
2264         async = cfs_list_empty(&aa->aa_oaps);
2265         if (!async) { /* from osc_send_oap_rpc() */
2266                 struct osc_async_page *oap, *tmp;
2267                 /* the caller may re-use the oap after the completion call so
2268                  * we need to clean it up a little */
2269                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2270                                              oap_rpc_item) {
2271                         cfs_list_del_init(&oap->oap_rpc_item);
2272                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2273                 }
2274                 OBDO_FREE(aa->aa_oa);
2275         } else { /* from async_internal() */
2276                 obd_count i;
2277                 for (i = 0; i < aa->aa_page_count; i++)
2278                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2279         }
2280         osc_wake_cache_waiters(cli);
2281         osc_check_rpcs(env, cli);
2282         client_obd_list_unlock(&cli->cl_loi_list_lock);
2283         if (!async)
2284                 cl_req_completion(env, aa->aa_clerq, rc);
2285         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2286
2287         RETURN(rc);
2288 }
2289
2290 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2291                                             struct client_obd *cli,
2292                                             cfs_list_t *rpc_list,
2293                                             int page_count, int cmd)
2294 {
2295         struct ptlrpc_request *req;
2296         struct brw_page **pga = NULL;
2297         struct osc_brw_async_args *aa;
2298         struct obdo *oa = NULL;
2299         const struct obd_async_page_ops *ops = NULL;
2300         void *caller_data = NULL;
2301         struct osc_async_page *oap;
2302         struct osc_async_page *tmp;
2303         struct ost_body *body;
2304         struct cl_req *clerq = NULL;
2305         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2306         struct ldlm_lock *lock = NULL;
2307         struct cl_req_attr crattr;
2308         int i, rc, mpflag = 0;
2309
2310         ENTRY;
2311         LASSERT(!cfs_list_empty(rpc_list));
2312
2313         if (cmd & OBD_BRW_MEMALLOC)
2314                 mpflag = cfs_memory_pressure_get_and_set();
2315
2316         memset(&crattr, 0, sizeof crattr);
2317         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2318         if (pga == NULL)
2319                 GOTO(out, req = ERR_PTR(-ENOMEM));
2320
2321         OBDO_ALLOC(oa);
2322         if (oa == NULL)
2323                 GOTO(out, req = ERR_PTR(-ENOMEM));
2324
2325         i = 0;
2326         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2327                 struct cl_page *page = osc_oap2cl_page(oap);
2328                 if (ops == NULL) {
2329                         ops = oap->oap_caller_ops;
2330                         caller_data = oap->oap_caller_data;
2331
2332                         clerq = cl_req_alloc(env, page, crt,
2333                                              1 /* only 1-object rpcs for
2334                                                 * now */);
2335                         if (IS_ERR(clerq))
2336                                 GOTO(out, req = (void *)clerq);
2337                         lock = oap->oap_ldlm_lock;
2338                 }
2339                 pga[i] = &oap->oap_brw_page;
2340                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2341                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2342                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2343                 i++;
2344                 cl_req_page_add(env, clerq, page);
2345         }
2346
2347         /* always get the data for the obdo for the rpc */
2348         LASSERT(ops != NULL);
2349         crattr.cra_oa = oa;
2350         crattr.cra_capa = NULL;
2351         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2352         if (lock) {
2353                 oa->o_handle = lock->l_remote_handle;
2354                 oa->o_valid |= OBD_MD_FLHANDLE;
2355         }
2356
2357         rc = cl_req_prep(env, clerq);
2358         if (rc != 0) {
2359                 CERROR("cl_req_prep failed: %d\n", rc);
2360                 GOTO(out, req = ERR_PTR(rc));
2361         }
2362
2363         sort_brw_pages(pga, page_count);
2364         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2365                                   pga, &req, crattr.cra_capa, 1, 0);
2366         if (rc != 0) {
2367                 CERROR("prep_req failed: %d\n", rc);
2368                 GOTO(out, req = ERR_PTR(rc));
2369         }
2370
2371         if (cmd & OBD_BRW_MEMALLOC)
2372                 req->rq_memalloc = 1;
2373
2374         /* Need to update the timestamps after the request is built in case
2375          * we race with setattr (locally or in queue at OST).  If OST gets
2376          * later setattr before earlier BRW (as determined by the request xid),
2377          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2378          * way to do this in a single call.  bug 10150 */
2379         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2380         cl_req_attr_set(env, clerq, &crattr,
2381                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2382
2383         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2384         aa = ptlrpc_req_async_args(req);
2385         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2386         cfs_list_splice(rpc_list, &aa->aa_oaps);
2387         CFS_INIT_LIST_HEAD(rpc_list);
2388         aa->aa_clerq = clerq;
2389 out:
2390         if (cmd & OBD_BRW_MEMALLOC)
2391                 cfs_memory_pressure_restore(mpflag);
2392
2393         capa_put(crattr.cra_capa);
2394         if (IS_ERR(req)) {
2395                 if (oa)
2396                         OBDO_FREE(oa);
2397                 if (pga)
2398                         OBD_FREE(pga, sizeof(*pga) * page_count);
2399                 /* this should happen rarely and is pretty bad, it makes the
2400                  * pending list not follow the dirty order */
2401                 client_obd_list_lock(&cli->cl_loi_list_lock);
2402                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2403                         cfs_list_del_init(&oap->oap_rpc_item);
2404
2405                         /* queued sync pages can be torn down while the pages
2406                          * were between the pending list and the rpc */
2407                         if (oap->oap_interrupted) {
2408                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2409                                 osc_ap_completion(env, cli, NULL, oap, 0,
2410                                                   oap->oap_count);
2411                                 continue;
2412                         }
2413                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2414                 }
2415                 if (clerq && !IS_ERR(clerq))
2416                         cl_req_completion(env, clerq, PTR_ERR(req));
2417         }
2418         RETURN(req);
2419 }
2420
2421 /**
2422  * prepare pages for ASYNC io and put pages in send queue.
2423  *
2424  * \param cmd OBD_BRW_* macroses
2425  * \param lop pending pages
2426  *
2427  * \return zero if no page added to send queue.
2428  * \return 1 if pages successfully added to send queue.
2429  * \return negative on errors.
2430  */
2431 static int
2432 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2433                  struct lov_oinfo *loi,
2434                  int cmd, struct loi_oap_pages *lop)
2435 {
2436         struct ptlrpc_request *req;
2437         obd_count page_count = 0;
2438         struct osc_async_page *oap = NULL, *tmp;
2439         struct osc_brw_async_args *aa;
2440         const struct obd_async_page_ops *ops;
2441         CFS_LIST_HEAD(rpc_list);
2442         int srvlock = 0, mem_tight = 0;
2443         struct cl_object *clob = NULL;
2444         obd_off starting_offset = OBD_OBJECT_EOF;
2445         unsigned int ending_offset;
2446         int starting_page_off = 0;
2447         ENTRY;
2448
2449         /* ASYNC_HP pages first. At present, when the lock the pages is
2450          * to be canceled, the pages covered by the lock will be sent out
2451          * with ASYNC_HP. We have to send out them as soon as possible. */
2452         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2453                 if (oap->oap_async_flags & ASYNC_HP)
2454                         cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
2455                 if (++page_count >= cli->cl_max_pages_per_rpc)
2456                         break;
2457         }
2458         page_count = 0;
2459
2460         /* first we find the pages we're allowed to work with */
2461         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2462                                      oap_pending_item) {
2463                 ops = oap->oap_caller_ops;
2464
2465                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2466                          "magic 0x%x\n", oap, oap->oap_magic);
2467
2468                 if (clob == NULL) {
2469                         /* pin object in memory, so that completion call-backs
2470                          * can be safely called under client_obd_list lock. */
2471                         clob = osc_oap2cl_page(oap)->cp_obj;
2472                         cl_object_get(clob);
2473                 }
2474
2475                 if (page_count != 0 &&
2476                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2477                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2478                                " oap %p, page %p, srvlock %u\n",
2479                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2480                         break;
2481                 }
2482
2483                 /* If there is a gap at the start of this page, it can't merge
2484                  * with any previous page, so we'll hand the network a
2485                  * "fragmented" page array that it can't transfer in 1 RDMA */
2486                 if (oap->oap_obj_off < starting_offset) {
2487                         if (starting_page_off != 0)
2488                                 break;
2489
2490                         starting_page_off = oap->oap_page_off;
2491                         starting_offset = oap->oap_obj_off + starting_page_off;
2492                 } else if (oap->oap_page_off != 0)
2493                         break;
2494
2495                 /* in llite being 'ready' equates to the page being locked
2496                  * until completion unlocks it.  commit_write submits a page
2497                  * as not ready because its unlock will happen unconditionally
2498                  * as the call returns.  if we race with commit_write giving
2499                  * us that page we don't want to create a hole in the page
2500                  * stream, so we stop and leave the rpc to be fired by
2501                  * another dirtier or kupdated interval (the not ready page
2502                  * will still be on the dirty list).  we could call in
2503                  * at the end of ll_file_write to process the queue again. */
2504                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2505                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2506                                                     cmd);
2507                         if (rc < 0)
2508                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2509                                                 "instead of ready\n", oap,
2510                                                 oap->oap_page, rc);
2511                         switch (rc) {
2512                         case -EAGAIN:
2513                                 /* llite is telling us that the page is still
2514                                  * in commit_write and that we should try
2515                                  * and put it in an rpc again later.  we
2516                                  * break out of the loop so we don't create
2517                                  * a hole in the sequence of pages in the rpc
2518                                  * stream.*/
2519                                 oap = NULL;
2520                                 break;
2521                         case -EINTR:
2522                                 /* the io isn't needed.. tell the checks
2523                                  * below to complete the rpc with EINTR */
2524                                 cfs_spin_lock(&oap->oap_lock);
2525                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2526                                 cfs_spin_unlock(&oap->oap_lock);
2527                                 oap->oap_count = -EINTR;
2528                                 break;
2529                         case 0:
2530                                 cfs_spin_lock(&oap->oap_lock);
2531                                 oap->oap_async_flags |= ASYNC_READY;
2532                                 cfs_spin_unlock(&oap->oap_lock);
2533                                 break;
2534                         default:
2535                                 LASSERTF(0, "oap %p page %p returned %d "
2536                                             "from make_ready\n", oap,
2537                                             oap->oap_page, rc);
2538                                 break;
2539                         }
2540                 }
2541                 if (oap == NULL)
2542                         break;
2543                 /*
2544                  * Page submitted for IO has to be locked. Either by
2545                  * ->ap_make_ready() or by higher layers.
2546                  */
2547 #if defined(__KERNEL__) && defined(__linux__)
2548                 {
2549                         struct cl_page *page;
2550
2551                         page = osc_oap2cl_page(oap);
2552
2553                         if (page->cp_type == CPT_CACHEABLE &&
2554                             !(PageLocked(oap->oap_page) &&
2555                               (CheckWriteback(oap->oap_page, cmd)))) {
2556                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2557                                        oap->oap_page,
2558                                        (long)oap->oap_page->flags,
2559                                        oap->oap_async_flags);
2560                                 LBUG();
2561                         }
2562                 }
2563 #endif
2564
2565                 /* take the page out of our book-keeping */
2566                 cfs_list_del_init(&oap->oap_pending_item);
2567                 lop_update_pending(cli, lop, cmd, -1);
2568                 cfs_list_del_init(&oap->oap_urgent_item);
2569
2570                 /* ask the caller for the size of the io as the rpc leaves. */
2571                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2572                         oap->oap_count =
2573                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2574                                                       cmd);
2575                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2576                 }
2577                 if (oap->oap_count <= 0) {
2578                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2579                                oap->oap_count);
2580                         osc_ap_completion(env, cli, NULL,
2581                                           oap, 0, oap->oap_count);
2582                         continue;
2583                 }
2584
2585                 /* now put the page back in our accounting */
2586                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2587                 if (page_count++ == 0)
2588                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2589
2590                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2591                         mem_tight = 1;
2592
2593                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2594                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2595                  * have the same alignment as the initial writes that allocated
2596                  * extents on the server. */
2597                 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2598                                 oap->oap_count;
2599                 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2600                         break;
2601
2602                 if (page_count >= cli->cl_max_pages_per_rpc)
2603                         break;
2604
2605                 /* If there is a gap at the end of this page, it can't merge
2606                  * with any subsequent pages, so we'll hand the network a
2607                  * "fragmented" page array that it can't transfer in 1 RDMA */
2608                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2609                         break;
2610         }
2611
2612         osc_wake_cache_waiters(cli);
2613
2614         loi_list_maint(cli, loi);
2615
2616         client_obd_list_unlock(&cli->cl_loi_list_lock);
2617
2618         if (clob != NULL)
2619                 cl_object_put(env, clob);
2620
2621         if (page_count == 0) {
2622                 client_obd_list_lock(&cli->cl_loi_list_lock);
2623                 RETURN(0);
2624         }
2625
2626         req = osc_build_req(env, cli, &rpc_list, page_count,
2627                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2628         if (IS_ERR(req)) {
2629                 LASSERT(cfs_list_empty(&rpc_list));
2630                 loi_list_maint(cli, loi);
2631                 RETURN(PTR_ERR(req));
2632         }
2633
2634         aa = ptlrpc_req_async_args(req);
2635
2636         starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2637         if (cmd == OBD_BRW_READ) {
2638                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2639                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2640                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2641                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2642         } else {
2643                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2644                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2645                                  cli->cl_w_in_flight);
2646                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2647                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2648         }
2649         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2650
2651         client_obd_list_lock(&cli->cl_loi_list_lock);
2652
2653         if (cmd == OBD_BRW_READ)
2654                 cli->cl_r_in_flight++;
2655         else
2656                 cli->cl_w_in_flight++;
2657
2658         /* queued sync pages can be torn down while the pages
2659          * were between the pending list and the rpc */
2660         tmp = NULL;
2661         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2662                 /* only one oap gets a request reference */
2663                 if (tmp == NULL)
2664                         tmp = oap;
2665                 if (oap->oap_interrupted && !req->rq_intr) {
2666                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2667                                oap, req);
2668                         ptlrpc_mark_interrupted(req);
2669                 }
2670         }
2671         if (tmp != NULL)
2672                 tmp->oap_request = ptlrpc_request_addref(req);
2673
2674         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2675                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2676
2677         req->rq_interpret_reply = brw_interpret;
2678         ptlrpcd_add_req(req, PSCOPE_BRW);
2679         RETURN(1);
2680 }
2681
2682 #define LOI_DEBUG(LOI, STR, args...)                                     \
2683         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2684                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2685                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2686                (LOI)->loi_write_lop.lop_num_pending,                     \
2687                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2688                (LOI)->loi_read_lop.lop_num_pending,                      \
2689                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2690                args)                                                     \
2691
2692 /* This is called by osc_check_rpcs() to find which objects have pages that
2693  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2694 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2695 {
2696         ENTRY;
2697
2698         /* First return objects that have blocked locks so that they
2699          * will be flushed quickly and other clients can get the lock,
2700          * then objects which have pages ready to be stuffed into RPCs */
2701         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2702                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2703                                       struct lov_oinfo, loi_hp_ready_item));
2704         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2705                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2706                                       struct lov_oinfo, loi_ready_item));
2707
2708         /* then if we have cache waiters, return all objects with queued
2709          * writes.  This is especially important when many small files
2710          * have filled up the cache and not been fired into rpcs because
2711          * they don't pass the nr_pending/object threshhold */
2712         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2713             !cfs_list_empty(&cli->cl_loi_write_list))
2714                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2715                                       struct lov_oinfo, loi_write_item));
2716
2717         /* then return all queued objects when we have an invalid import
2718          * so that they get flushed */
2719         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2720                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2721                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2722                                               struct lov_oinfo,
2723                                               loi_write_item));
2724                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2725                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2726                                               struct lov_oinfo, loi_read_item));
2727         }
2728         RETURN(NULL);
2729 }
2730
2731 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2732 {
2733         struct osc_async_page *oap;
2734         int hprpc = 0;
2735
2736         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2737                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2738                                      struct osc_async_page, oap_urgent_item);
2739                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2740         }
2741
2742         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2743                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2744                                      struct osc_async_page, oap_urgent_item);
2745                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2746         }
2747
2748         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2749 }
2750
2751 /* called with the loi list lock held */
2752 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2753 {
2754         struct lov_oinfo *loi;
2755         int rc = 0, race_counter = 0;
2756         ENTRY;
2757
2758         while ((loi = osc_next_loi(cli)) != NULL) {
2759                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2760
2761                 if (osc_max_rpc_in_flight(cli, loi))
2762                         break;
2763
2764                 /* attempt some read/write balancing by alternating between
2765                  * reads and writes in an object.  The makes_rpc checks here
2766                  * would be redundant if we were getting read/write work items
2767                  * instead of objects.  we don't want send_oap_rpc to drain a
2768                  * partial read pending queue when we're given this object to
2769                  * do io on writes while there are cache waiters */
2770                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2771                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2772                                               &loi->loi_write_lop);
2773                         if (rc < 0) {
2774                                 CERROR("Write request failed with %d\n", rc);
2775
2776                                 /* osc_send_oap_rpc failed, mostly because of
2777                                  * memory pressure.
2778                                  *
2779                                  * It can't break here, because if:
2780                                  *  - a page was submitted by osc_io_submit, so
2781                                  *    page locked;
2782                                  *  - no request in flight
2783                                  *  - no subsequent request
2784                                  * The system will be in live-lock state,
2785                                  * because there is no chance to call
2786                                  * osc_io_unplug() and osc_check_rpcs() any
2787                                  * more. pdflush can't help in this case,
2788                                  * because it might be blocked at grabbing
2789                                  * the page lock as we mentioned.
2790                                  *
2791                                  * Anyway, continue to drain pages. */
2792                                 /* break; */
2793                         }
2794
2795                         if (rc > 0)
2796                                 race_counter = 0;
2797                         else if (rc == 0)
2798                                 race_counter++;
2799                 }
2800                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2801                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2802                                               &loi->loi_read_lop);
2803                         if (rc < 0)
2804                                 CERROR("Read request failed with %d\n", rc);
2805
2806                         if (rc > 0)
2807                                 race_counter = 0;
2808                         else if (rc == 0)
2809                                 race_counter++;
2810                 }
2811
2812                 /* attempt some inter-object balancing by issuing rpcs
2813                  * for each object in turn */
2814                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2815                         cfs_list_del_init(&loi->loi_hp_ready_item);
2816                 if (!cfs_list_empty(&loi->loi_ready_item))
2817                         cfs_list_del_init(&loi->loi_ready_item);
2818                 if (!cfs_list_empty(&loi->loi_write_item))
2819                         cfs_list_del_init(&loi->loi_write_item);
2820                 if (!cfs_list_empty(&loi->loi_read_item))
2821                         cfs_list_del_init(&loi->loi_read_item);
2822
2823                 loi_list_maint(cli, loi);
2824
2825                 /* send_oap_rpc fails with 0 when make_ready tells it to
2826                  * back off.  llite's make_ready does this when it tries
2827                  * to lock a page queued for write that is already locked.
2828                  * we want to try sending rpcs from many objects, but we
2829                  * don't want to spin failing with 0.  */
2830                 if (race_counter == 10)
2831                         break;
2832         }
2833         EXIT;
2834 }
2835
2836 /* we're trying to queue a page in the osc so we're subject to the
2837  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2838  * If the osc's queued pages are already at that limit, then we want to sleep
2839  * until there is space in the osc's queue for us.  We also may be waiting for
2840  * write credits from the OST if there are RPCs in flight that may return some
2841  * before we fall back to sync writes.
2842  *
2843  * We need this know our allocation was granted in the presence of signals */
2844 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2845 {
2846         int rc;
2847         ENTRY;
2848         client_obd_list_lock(&cli->cl_loi_list_lock);
2849         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2850         client_obd_list_unlock(&cli->cl_loi_list_lock);
2851         RETURN(rc);
2852 };
2853
2854 /**
2855  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2856  * is available.
2857  */
2858 int osc_enter_cache_try(const struct lu_env *env,
2859                         struct client_obd *cli, struct lov_oinfo *loi,
2860                         struct osc_async_page *oap, int transient)
2861 {
2862         int has_grant;
2863
2864         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2865         if (has_grant) {
2866                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2867                 if (transient) {
2868                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2869                         cfs_atomic_inc(&obd_dirty_transit_pages);
2870                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2871                 }
2872         }
2873         return has_grant;
2874 }
2875
2876 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2877  * grant or cache space. */
2878 static int osc_enter_cache(const struct lu_env *env,
2879                            struct client_obd *cli, struct lov_oinfo *loi,
2880                            struct osc_async_page *oap)
2881 {
2882         struct osc_cache_waiter ocw;
2883         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2884
2885         ENTRY;
2886
2887         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2888                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2889                cli->cl_dirty_max, obd_max_dirty_pages,
2890                cli->cl_lost_grant, cli->cl_avail_grant);
2891
2892         /* force the caller to try sync io.  this can jump the list
2893          * of queued writes and create a discontiguous rpc stream */
2894         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2895             cli->cl_dirty_max < CFS_PAGE_SIZE     ||
2896             cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2897                 RETURN(-EDQUOT);
2898
2899         /* Hopefully normal case - cache space and write credits available */
2900         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2901             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2902             osc_enter_cache_try(env, cli, loi, oap, 0))
2903                 RETURN(0);
2904
2905         /* It is safe to block as a cache waiter as long as there is grant
2906          * space available or the hope of additional grant being returned
2907          * when an in flight write completes.  Using the write back cache
2908          * if possible is preferable to sending the data synchronously
2909          * because write pages can then be merged in to large requests.
2910          * The addition of this cache waiter will causing pending write
2911          * pages to be sent immediately. */
2912         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2913                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2914                 cfs_waitq_init(&ocw.ocw_waitq);
2915                 ocw.ocw_oap = oap;
2916                 ocw.ocw_rc = 0;
2917
2918                 loi_list_maint(cli, loi);
2919                 osc_check_rpcs(env, cli);
2920                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2921
2922                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2923                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2924
2925                 client_obd_list_lock(&cli->cl_loi_list_lock);
2926                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2927                         cfs_list_del(&ocw.ocw_entry);
2928                         RETURN(-EINTR);
2929                 }
2930                 RETURN(ocw.ocw_rc);
2931         }
2932
2933         RETURN(-EDQUOT);
2934 }
2935
2936
2937 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2938                         struct lov_oinfo *loi, cfs_page_t *page,
2939                         obd_off offset, const struct obd_async_page_ops *ops,
2940                         void *data, void **res, int nocache,
2941                         struct lustre_handle *lockh)
2942 {
2943         struct osc_async_page *oap;
2944
2945         ENTRY;
2946
2947         if (!page)
2948                 return cfs_size_round(sizeof(*oap));
2949
2950         oap = *res;
2951         oap->oap_magic = OAP_MAGIC;
2952         oap->oap_cli = &exp->exp_obd->u.cli;
2953         oap->oap_loi = loi;
2954
2955         oap->oap_caller_ops = ops;
2956         oap->oap_caller_data = data;
2957
2958         oap->oap_page = page;
2959         oap->oap_obj_off = offset;
2960         if (!client_is_remote(exp) &&
2961             cfs_capable(CFS_CAP_SYS_RESOURCE))
2962                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2963
2964         LASSERT(!(offset & ~CFS_PAGE_MASK));
2965
2966         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2967         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2968         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2969         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2970
2971         cfs_spin_lock_init(&oap->oap_lock);
2972         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2973         RETURN(0);
2974 }
2975
2976 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2977                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2978                        struct osc_async_page *oap, int cmd, int off,
2979                        int count, obd_flag brw_flags, enum async_flags async_flags)
2980 {
2981         struct client_obd *cli = &exp->exp_obd->u.cli;
2982         int rc = 0;
2983         ENTRY;
2984
2985         if (oap->oap_magic != OAP_MAGIC)
2986                 RETURN(-EINVAL);
2987
2988         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2989                 RETURN(-EIO);
2990
2991         if (!cfs_list_empty(&oap->oap_pending_item) ||
2992             !cfs_list_empty(&oap->oap_urgent_item) ||
2993             !cfs_list_empty(&oap->oap_rpc_item))
2994                 RETURN(-EBUSY);
2995
2996         /* check if the file's owner/group is over quota */
2997         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2998                 struct cl_object *obj;
2999                 struct cl_attr    attr; /* XXX put attr into thread info */
3000                 unsigned int qid[MAXQUOTAS];
3001
3002                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3003
3004                 cl_object_attr_lock(obj);
3005                 rc = cl_object_attr_get(env, obj, &attr);
3006                 cl_object_attr_unlock(obj);
3007
3008                 qid[USRQUOTA] = attr.cat_uid;
3009                 qid[GRPQUOTA] = attr.cat_gid;
3010                 if (rc == 0 &&
3011                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3012                         rc = -EDQUOT;
3013                 if (rc)
3014                         RETURN(rc);
3015         }
3016
3017         if (loi == NULL)
3018                 loi = lsm->lsm_oinfo[0];
3019
3020         client_obd_list_lock(&cli->cl_loi_list_lock);
3021
3022         LASSERT(off + count <= CFS_PAGE_SIZE);
3023         oap->oap_cmd = cmd;
3024         oap->oap_page_off = off;
3025         oap->oap_count = count;
3026         oap->oap_brw_flags = brw_flags;
3027         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3028         if (cfs_memory_pressure_get())
3029                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3030         cfs_spin_lock(&oap->oap_lock);
3031         oap->oap_async_flags = async_flags;
3032         cfs_spin_unlock(&oap->oap_lock);
3033
3034         if (cmd & OBD_BRW_WRITE) {
3035                 rc = osc_enter_cache(env, cli, loi, oap);
3036                 if (rc) {
3037                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3038                         RETURN(rc);
3039                 }
3040         }
3041
3042         osc_oap_to_pending(oap);
3043         loi_list_maint(cli, loi);
3044
3045         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3046                   cmd);
3047
3048         osc_check_rpcs(env, cli);
3049         client_obd_list_unlock(&cli->cl_loi_list_lock);
3050
3051         RETURN(0);
3052 }
3053
3054 /* aka (~was & now & flag), but this is more clear :) */
3055 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3056
3057 int osc_set_async_flags_base(struct client_obd *cli,
3058                              struct lov_oinfo *loi, struct osc_async_page *oap,
3059                              obd_flag async_flags)
3060 {
3061         struct loi_oap_pages *lop;
3062         int flags = 0;
3063         ENTRY;
3064
3065         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3066
3067         if (oap->oap_cmd & OBD_BRW_WRITE) {
3068                 lop = &loi->loi_write_lop;
3069         } else {
3070                 lop = &loi->loi_read_lop;
3071         }
3072
3073         if ((oap->oap_async_flags & async_flags) == async_flags)
3074                 RETURN(0);
3075
3076         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3077                 flags |= ASYNC_READY;
3078
3079         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3080             cfs_list_empty(&oap->oap_rpc_item)) {
3081                 if (oap->oap_async_flags & ASYNC_HP)
3082                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3083                 else
3084                         cfs_list_add_tail(&oap->oap_urgent_item,
3085                                           &lop->lop_urgent);
3086                 flags |= ASYNC_URGENT;
3087                 loi_list_maint(cli, loi);
3088         }
3089         cfs_spin_lock(&oap->oap_lock);
3090         oap->oap_async_flags |= flags;
3091         cfs_spin_unlock(&oap->oap_lock);
3092
3093         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3094                         oap->oap_async_flags);
3095         RETURN(0);
3096 }
3097
3098 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3099                             struct lov_oinfo *loi, struct osc_async_page *oap)
3100 {
3101         struct client_obd *cli = &exp->exp_obd->u.cli;
3102         struct loi_oap_pages *lop;
3103         int rc = 0;
3104         ENTRY;
3105
3106         if (oap->oap_magic != OAP_MAGIC)
3107                 RETURN(-EINVAL);
3108
3109         if (loi == NULL)
3110                 loi = lsm->lsm_oinfo[0];
3111
3112         if (oap->oap_cmd & OBD_BRW_WRITE) {
3113                 lop = &loi->loi_write_lop;
3114         } else {
3115                 lop = &loi->loi_read_lop;
3116         }
3117
3118         client_obd_list_lock(&cli->cl_loi_list_lock);
3119
3120         if (!cfs_list_empty(&oap->oap_rpc_item))
3121                 GOTO(out, rc = -EBUSY);
3122
3123         osc_exit_cache(cli, oap, 0);
3124         osc_wake_cache_waiters(cli);
3125
3126         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3127                 cfs_list_del_init(&oap->oap_urgent_item);
3128                 cfs_spin_lock(&oap->oap_lock);
3129                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3130                 cfs_spin_unlock(&oap->oap_lock);
3131         }
3132         if (!cfs_list_empty(&oap->oap_pending_item)) {
3133                 cfs_list_del_init(&oap->oap_pending_item);
3134                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3135         }
3136         loi_list_maint(cli, loi);
3137         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3138 out:
3139         client_obd_list_unlock(&cli->cl_loi_list_lock);
3140         RETURN(rc);
3141 }
3142
3143 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3144                                         struct ldlm_enqueue_info *einfo)
3145 {
3146         void *data = einfo->ei_cbdata;
3147         int set = 0;
3148
3149         LASSERT(lock != NULL);
3150         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3151         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3152         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3153         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3154
3155         lock_res_and_lock(lock);
3156         cfs_spin_lock(&osc_ast_guard);
3157
3158         if (lock->l_ast_data == NULL)
3159                 lock->l_ast_data = data;
3160         if (lock->l_ast_data == data)
3161                 set = 1;
3162
3163         cfs_spin_unlock(&osc_ast_guard);
3164         unlock_res_and_lock(lock);
3165
3166         return set;
3167 }
3168
3169 static int osc_set_data_with_check(struct lustre_handle *lockh,
3170                                    struct ldlm_enqueue_info *einfo)
3171 {
3172         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3173         int set = 0;
3174
3175         if (lock != NULL) {
3176                 set = osc_set_lock_data_with_check(lock, einfo);
3177                 LDLM_LOCK_PUT(lock);
3178         } else
3179                 CERROR("lockh %p, data %p - client evicted?\n",
3180                        lockh, einfo->ei_cbdata);
3181         return set;
3182 }
3183
3184 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3185                              ldlm_iterator_t replace, void *data)
3186 {
3187         struct ldlm_res_id res_id;
3188         struct obd_device *obd = class_exp2obd(exp);
3189
3190         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3191         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3192         return 0;
3193 }
3194
3195 /* find any ldlm lock of the inode in osc
3196  * return 0    not find
3197  *        1    find one
3198  *      < 0    error */
3199 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3200                            ldlm_iterator_t replace, void *data)
3201 {
3202         struct ldlm_res_id res_id;
3203         struct obd_device *obd = class_exp2obd(exp);
3204         int rc = 0;
3205
3206         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3207         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3208         if (rc == LDLM_ITER_STOP)
3209                 return(1);
3210         if (rc == LDLM_ITER_CONTINUE)
3211                 return(0);
3212         return(rc);
3213 }
3214
3215 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3216                             obd_enqueue_update_f upcall, void *cookie,
3217                             int *flags, int rc)
3218 {
3219         int intent = *flags & LDLM_FL_HAS_INTENT;
3220         ENTRY;
3221
3222         if (intent) {
3223                 /* The request was created before ldlm_cli_enqueue call. */
3224                 if (rc == ELDLM_LOCK_ABORTED) {
3225                         struct ldlm_reply *rep;
3226                         rep = req_capsule_server_get(&req->rq_pill,
3227                                                      &RMF_DLM_REP);
3228
3229                         LASSERT(rep != NULL);
3230                         if (rep->lock_policy_res1)
3231                                 rc = rep->lock_policy_res1;
3232                 }
3233         }
3234
3235         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3236                 *flags |= LDLM_FL_LVB_READY;
3237                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3238                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3239         }
3240
3241         /* Call the update callback. */
3242         rc = (*upcall)(cookie, rc);
3243         RETURN(rc);
3244 }
3245
3246 static int osc_enqueue_interpret(const struct lu_env *env,
3247                                  struct ptlrpc_request *req,
3248                                  struct osc_enqueue_args *aa, int rc)
3249 {
3250         struct ldlm_lock *lock;
3251         struct lustre_handle handle;
3252         __u32 mode;
3253
3254         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3255          * might be freed anytime after lock upcall has been called. */
3256         lustre_handle_copy(&handle, aa->oa_lockh);
3257         mode = aa->oa_ei->ei_mode;
3258
3259         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3260          * be valid. */
3261         lock = ldlm_handle2lock(&handle);
3262
3263         /* Take an additional reference so that a blocking AST that
3264          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3265          * to arrive after an upcall has been executed by
3266          * osc_enqueue_fini(). */
3267         ldlm_lock_addref(&handle, mode);
3268
3269         /* Let CP AST to grant the lock first. */
3270         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3271
3272         /* Complete obtaining the lock procedure. */
3273         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3274                                    mode, aa->oa_flags, aa->oa_lvb,
3275                                    sizeof(*aa->oa_lvb), &handle, rc);
3276         /* Complete osc stuff. */
3277         rc = osc_enqueue_fini(req, aa->oa_lvb,
3278                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3279
3280         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3281
3282         /* Release the lock for async request. */
3283         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3284                 /*
3285                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3286                  * not already released by
3287                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3288                  */
3289                 ldlm_lock_decref(&handle, mode);
3290
3291         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3292                  aa->oa_lockh, req, aa);
3293         ldlm_lock_decref(&handle, mode);
3294         LDLM_LOCK_PUT(lock);
3295         return rc;
3296 }
3297
3298 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3299                         struct lov_oinfo *loi, int flags,
3300                         struct ost_lvb *lvb, __u32 mode, int rc)
3301 {
3302         if (rc == ELDLM_OK) {
3303                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3304                 __u64 tmp;
3305
3306                 LASSERT(lock != NULL);
3307                 loi->loi_lvb = *lvb;
3308                 tmp = loi->loi_lvb.lvb_size;
3309                 /* Extend KMS up to the end of this lock and no further
3310                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3311                 if (tmp > lock->l_policy_data.l_extent.end)
3312                         tmp = lock->l_policy_data.l_extent.end + 1;
3313                 if (tmp >= loi->loi_kms) {
3314                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3315                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3316                         loi_kms_set(loi, tmp);
3317                 } else {
3318                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3319                                    LPU64"; leaving kms="LPU64", end="LPU64,
3320                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3321                                    lock->l_policy_data.l_extent.end);
3322                 }
3323                 ldlm_lock_allow_match(lock);
3324                 LDLM_LOCK_PUT(lock);
3325         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3326                 loi->loi_lvb = *lvb;
3327                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3328                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3329                 rc = ELDLM_OK;
3330         }
3331 }
3332 EXPORT_SYMBOL(osc_update_enqueue);
3333
3334 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3335
3336 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3337  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3338  * other synchronous requests, however keeping some locks and trying to obtain
3339  * others may take a considerable amount of time in a case of ost failure; and
3340  * when other sync requests do not get released lock from a client, the client
3341  * is excluded from the cluster -- such scenarious make the life difficult, so
3342  * release locks just after they are obtained. */
3343 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3344                      int *flags, ldlm_policy_data_t *policy,
3345                      struct ost_lvb *lvb, int kms_valid,
3346                      obd_enqueue_update_f upcall, void *cookie,
3347                      struct ldlm_enqueue_info *einfo,
3348                      struct lustre_handle *lockh,
3349                      struct ptlrpc_request_set *rqset, int async)
3350 {
3351         struct obd_device *obd = exp->exp_obd;
3352         struct ptlrpc_request *req = NULL;
3353         int intent = *flags & LDLM_FL_HAS_INTENT;
3354         ldlm_mode_t mode;
3355         int rc;
3356         ENTRY;
3357
3358         /* Filesystem lock extents are extended to page boundaries so that
3359          * dealing with the page cache is a little smoother.  */
3360         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3361         policy->l_extent.end |= ~CFS_PAGE_MASK;
3362
3363         /*
3364          * kms is not valid when either object is completely fresh (so that no
3365          * locks are cached), or object was evicted. In the latter case cached
3366          * lock cannot be used, because it would prime inode state with
3367          * potentially stale LVB.
3368          */
3369         if (!kms_valid)
3370                 goto no_match;
3371
3372         /* Next, search for already existing extent locks that will cover us */
3373         /* If we're trying to read, we also search for an existing PW lock.  The
3374          * VFS and page cache already protect us locally, so lots of readers/
3375          * writers can share a single PW lock.
3376          *
3377          * There are problems with conversion deadlocks, so instead of
3378          * converting a read lock to a write lock, we'll just enqueue a new
3379          * one.
3380          *
3381          * At some point we should cancel the read lock instead of making them
3382          * send us a blocking callback, but there are problems with canceling
3383          * locks out from other users right now, too. */
3384         mode = einfo->ei_mode;
3385         if (einfo->ei_mode == LCK_PR)
3386                 mode |= LCK_PW;
3387         mode = ldlm_lock_match(obd->obd_namespace,
3388                                *flags | LDLM_FL_LVB_READY, res_id,
3389                                einfo->ei_type, policy, mode, lockh, 0);
3390         if (mode) {
3391                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3392
3393                 if (osc_set_lock_data_with_check(matched, einfo)) {
3394                         /* addref the lock only if not async requests and PW
3395                          * lock is matched whereas we asked for PR. */
3396                         if (!rqset && einfo->ei_mode != mode)
3397                                 ldlm_lock_addref(lockh, LCK_PR);
3398                         if (intent) {
3399                                 /* I would like to be able to ASSERT here that
3400                                  * rss <= kms, but I can't, for reasons which
3401                                  * are explained in lov_enqueue() */
3402                         }
3403
3404                         /* We already have a lock, and it's referenced */
3405                         (*upcall)(cookie, ELDLM_OK);
3406
3407                         /* For async requests, decref the lock. */
3408                         if (einfo->ei_mode != mode)
3409                                 ldlm_lock_decref(lockh, LCK_PW);
3410                         else if (rqset)
3411                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3412                         LDLM_LOCK_PUT(matched);
3413                         RETURN(ELDLM_OK);
3414                 } else
3415                         ldlm_lock_decref(lockh, mode);
3416                 LDLM_LOCK_PUT(matched);
3417         }
3418
3419  no_match:
3420         if (intent) {
3421                 CFS_LIST_HEAD(cancels);
3422                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3423                                            &RQF_LDLM_ENQUEUE_LVB);
3424                 if (req == NULL)
3425                         RETURN(-ENOMEM);
3426
3427                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3428                 if (rc) {
3429                         ptlrpc_request_free(req);
3430                         RETURN(rc);
3431                 }
3432
3433                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3434                                      sizeof *lvb);
3435                 ptlrpc_request_set_replen(req);
3436         }
3437
3438         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3439         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3440
3441         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3442                               sizeof(*lvb), lockh, async);
3443         if (rqset) {
3444                 if (!rc) {
3445                         struct osc_enqueue_args *aa;
3446                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3447                         aa = ptlrpc_req_async_args(req);
3448                         aa->oa_ei = einfo;
3449                         aa->oa_exp = exp;
3450                         aa->oa_flags  = flags;
3451                         aa->oa_upcall = upcall;
3452                         aa->oa_cookie = cookie;
3453                         aa->oa_lvb    = lvb;
3454                         aa->oa_lockh  = lockh;
3455
3456                         req->rq_interpret_reply =
3457                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3458                         if (rqset == PTLRPCD_SET)
3459                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3460                         else
3461                                 ptlrpc_set_add_req(rqset, req);
3462                 } else if (intent) {
3463                         ptlrpc_req_finished(req);
3464                 }
3465                 RETURN(rc);
3466         }
3467
3468         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3469         if (intent)
3470                 ptlrpc_req_finished(req);
3471
3472         RETURN(rc);
3473 }
3474
3475 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3476                        struct ldlm_enqueue_info *einfo,
3477                        struct ptlrpc_request_set *rqset)
3478 {
3479         struct ldlm_res_id res_id;
3480         int rc;
3481         ENTRY;
3482
3483         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3484                            oinfo->oi_md->lsm_object_seq, &res_id);
3485
3486         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3487                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3488                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3489                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3490                               rqset, rqset != NULL);
3491         RETURN(rc);
3492 }
3493
3494 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3495                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3496                    int *flags, void *data, struct lustre_handle *lockh,
3497                    int unref)
3498 {
3499         struct obd_device *obd = exp->exp_obd;
3500         int lflags = *flags;
3501         ldlm_mode_t rc;
3502         ENTRY;
3503
3504         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3505                 RETURN(-EIO);
3506
3507         /* Filesystem lock extents are extended to page boundaries so that
3508          * dealing with the page cache is a little smoother */
3509         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3510         policy->l_extent.end |= ~CFS_PAGE_MASK;
3511
3512         /* Next, search for already existing extent locks that will cover us */
3513         /* If we're trying to read, we also search for an existing PW lock.  The
3514          * VFS and page cache already protect us locally, so lots of readers/
3515          * writers can share a single PW lock. */
3516         rc = mode;
3517         if (mode == LCK_PR)
3518                 rc |= LCK_PW;
3519         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3520                              res_id, type, policy, rc, lockh, unref);
3521         if (rc) {
3522                 if (data != NULL) {
3523                         if (!osc_set_data_with_check(lockh, data)) {
3524                                 if (!(lflags & LDLM_FL_TEST_LOCK))
3525                                         ldlm_lock_decref(lockh, rc);
3526                                 RETURN(0);
3527                         }
3528                 }
3529                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3530                         ldlm_lock_addref(lockh, LCK_PR);
3531                         ldlm_lock_decref(lockh, LCK_PW);
3532                 }
3533                 RETURN(rc);
3534         }
3535         RETURN(rc);
3536 }
3537
3538 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3539 {
3540         ENTRY;
3541
3542         if (unlikely(mode == LCK_GROUP))
3543                 ldlm_lock_decref_and_cancel(lockh, mode);
3544         else
3545                 ldlm_lock_decref(lockh, mode);
3546
3547         RETURN(0);
3548 }
3549
3550 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3551                       __u32 mode, struct lustre_handle *lockh)
3552 {
3553         ENTRY;
3554         RETURN(osc_cancel_base(lockh, mode));
3555 }
3556
3557 static int osc_cancel_unused(struct obd_export *exp,
3558                              struct lov_stripe_md *lsm,
3559                              ldlm_cancel_flags_t flags,
3560                              void *opaque)
3561 {
3562         struct obd_device *obd = class_exp2obd(exp);
3563         struct ldlm_res_id res_id, *resp = NULL;
3564
3565         if (lsm != NULL) {
3566                 resp = osc_build_res_name(lsm->lsm_object_id,
3567                                           lsm->lsm_object_seq, &res_id);
3568         }
3569
3570         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3571 }
3572
3573 static int osc_statfs_interpret(const struct lu_env *env,
3574                                 struct ptlrpc_request *req,
3575                                 struct osc_async_args *aa, int rc)
3576 {
3577         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3578         struct obd_statfs *msfs;
3579         __u64 used;
3580         ENTRY;
3581
3582         if (rc == -EBADR)
3583                 /* The request has in fact never been sent
3584                  * due to issues at a higher level (LOV).
3585                  * Exit immediately since the caller is
3586                  * aware of the problem and takes care
3587                  * of the clean up */
3588                  RETURN(rc);
3589
3590         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3591             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3592                 GOTO(out, rc = 0);
3593
3594         if (rc != 0)
3595                 GOTO(out, rc);
3596
3597         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3598         if (msfs == NULL) {
3599                 GOTO(out, rc = -EPROTO);
3600         }
3601
3602         /* Reinitialize the RDONLY and DEGRADED flags at the client
3603          * on each statfs, so they don't stay set permanently. */
3604         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3605
3606         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3607                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3608         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3609                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3610
3611         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3612                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3613         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3614                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3615
3616         /* Add a bit of hysteresis so this flag isn't continually flapping,
3617          * and ensure that new files don't get extremely fragmented due to
3618          * only a small amount of available space in the filesystem.
3619          * We want to set the NOSPC flag when there is less than ~0.1% free
3620          * and clear it when there is at least ~0.2% free space, so:
3621          *                   avail < ~0.1% max          max = avail + used
3622          *            1025 * avail < avail + used       used = blocks - free
3623          *            1024 * avail < used
3624          *            1024 * avail < blocks - free
3625          *                   avail < ((blocks - free) >> 10)
3626          *
3627          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3628          * lose that amount of space so in those cases we report no space left
3629          * if their is less than 1 GB left.                             */
3630         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3631         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3632                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3633                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3634         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3635                           (msfs->os_ffree > 64) &&
3636                           (msfs->os_bavail > (used << 1)))) {
3637                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3638                                              OSCC_FLAG_NOSPC_BLK);
3639         }
3640
3641         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3642                      (msfs->os_bavail < used)))
3643                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3644
3645         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3646
3647         *aa->aa_oi->oi_osfs = *msfs;
3648 out:
3649         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3650         RETURN(rc);
3651 }
3652
3653 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3654                             __u64 max_age, struct ptlrpc_request_set *rqset)
3655 {
3656         struct ptlrpc_request *req;
3657         struct osc_async_args *aa;
3658         int                    rc;
3659         ENTRY;
3660
3661         /* We could possibly pass max_age in the request (as an absolute
3662          * timestamp or a "seconds.usec ago") so the target can avoid doing
3663          * extra calls into the filesystem if that isn't necessary (e.g.
3664          * during mount that would help a bit).  Having relative timestamps
3665          * is not so great if request processing is slow, while absolute
3666          * timestamps are not ideal because they need time synchronization. */
3667         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3668         if (req == NULL)
3669                 RETURN(-ENOMEM);
3670
3671         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3672         if (rc) {
3673                 ptlrpc_request_free(req);
3674                 RETURN(rc);
3675         }
3676         ptlrpc_request_set_replen(req);
3677         req->rq_request_portal = OST_CREATE_PORTAL;
3678         ptlrpc_at_set_req_timeout(req);
3679
3680         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3681                 /* procfs requests not want stat in wait for avoid deadlock */
3682                 req->rq_no_resend = 1;
3683                 req->rq_no_delay = 1;
3684         }
3685
3686         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3687         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3688         aa = ptlrpc_req_async_args(req);
3689         aa->aa_oi = oinfo;
3690
3691         ptlrpc_set_add_req(rqset, req);
3692         RETURN(0);
3693 }
3694
3695 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3696                       __u64 max_age, __u32 flags)
3697 {
3698         struct obd_statfs     *msfs;
3699         struct ptlrpc_request *req;
3700         struct obd_import     *imp = NULL;
3701         int rc;
3702         ENTRY;
3703
3704         /*Since the request might also come from lprocfs, so we need
3705          *sync this with client_disconnect_export Bug15684*/
3706         cfs_down_read(&obd->u.cli.cl_sem);
3707         if (obd->u.cli.cl_import)
3708                 imp = class_import_get(obd->u.cli.cl_import);
3709         cfs_up_read(&obd->u.cli.cl_sem);
3710         if (!imp)
3711                 RETURN(-ENODEV);
3712
3713         /* We could possibly pass max_age in the request (as an absolute
3714          * timestamp or a "seconds.usec ago") so the target can avoid doing
3715          * extra calls into the filesystem if that isn't necessary (e.g.
3716          * during mount that would help a bit).  Having relative timestamps
3717          * is not so great if request processing is slow, while absolute
3718          * timestamps are not ideal because they need time synchronization. */
3719         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3720
3721         class_import_put(imp);
3722
3723         if (req == NULL)
3724                 RETURN(-ENOMEM);
3725
3726         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3727         if (rc) {
3728                 ptlrpc_request_free(req);
3729                 RETURN(rc);
3730         }
3731         ptlrpc_request_set_replen(req);
3732         req->rq_request_portal = OST_CREATE_PORTAL;
3733         ptlrpc_at_set_req_timeout(req);
3734
3735         if (flags & OBD_STATFS_NODELAY) {
3736                 /* procfs requests not want stat in wait for avoid deadlock */
3737                 req->rq_no_resend = 1;
3738                 req->rq_no_delay = 1;
3739         }
3740
3741         rc = ptlrpc_queue_wait(req);
3742         if (rc)
3743                 GOTO(out, rc);
3744
3745         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3746         if (msfs == NULL) {
3747                 GOTO(out, rc = -EPROTO);
3748         }
3749
3750         *osfs = *msfs;
3751
3752         EXIT;
3753  out:
3754         ptlrpc_req_finished(req);
3755         return rc;
3756 }
3757
3758 /* Retrieve object striping information.
3759  *
3760  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3761  * the maximum number of OST indices which will fit in the user buffer.
3762  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3763  */
3764 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3765 {
3766         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3767         struct lov_user_md_v3 lum, *lumk;
3768         struct lov_user_ost_data_v1 *lmm_objects;
3769         int rc = 0, lum_size;
3770         ENTRY;
3771
3772         if (!lsm)
3773                 RETURN(-ENODATA);
3774
3775         /* we only need the header part from user space to get lmm_magic and
3776          * lmm_stripe_count, (the header part is common to v1 and v3) */
3777         lum_size = sizeof(struct lov_user_md_v1);
3778         if (cfs_copy_from_user(&lum, lump, lum_size))
3779                 RETURN(-EFAULT);
3780
3781         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3782             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3783                 RETURN(-EINVAL);
3784
3785         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3786         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3787         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3788         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3789
3790         /* we can use lov_mds_md_size() to compute lum_size
3791          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3792         if (lum.lmm_stripe_count > 0) {
3793                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3794                 OBD_ALLOC(lumk, lum_size);
3795                 if (!lumk)
3796                         RETURN(-ENOMEM);
3797
3798                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3799                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3800                 else
3801                         lmm_objects = &(lumk->lmm_objects[0]);
3802                 lmm_objects->l_object_id = lsm->lsm_object_id;
3803         } else {
3804                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3805                 lumk = &lum;
3806         }
3807
3808         lumk->lmm_object_id = lsm->lsm_object_id;
3809         lumk->lmm_object_seq = lsm->lsm_object_seq;
3810         lumk->lmm_stripe_count = 1;
3811
3812         if (cfs_copy_to_user(lump, lumk, lum_size))
3813                 rc = -EFAULT;
3814
3815         if (lumk != &lum)
3816                 OBD_FREE(lumk, lum_size);
3817
3818         RETURN(rc);
3819 }
3820
3821
3822 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3823                          void *karg, void *uarg)
3824 {
3825         struct obd_device *obd = exp->exp_obd;
3826         struct obd_ioctl_data *data = karg;
3827         int err = 0;
3828         ENTRY;
3829
3830         if (!cfs_try_module_get(THIS_MODULE)) {
3831                 CERROR("Can't get module. Is it alive?");
3832                 return -EINVAL;
3833         }
3834         switch (cmd) {
3835         case OBD_IOC_LOV_GET_CONFIG: {
3836                 char *buf;
3837                 struct lov_desc *desc;
3838                 struct obd_uuid uuid;
3839
3840                 buf = NULL;
3841                 len = 0;
3842                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3843                         GOTO(out, err = -EINVAL);
3844
3845                 data = (struct obd_ioctl_data *)buf;
3846
3847                 if (sizeof(*desc) > data->ioc_inllen1) {
3848                         obd_ioctl_freedata(buf, len);
3849                         GOTO(out, err = -EINVAL);
3850                 }
3851
3852                 if (data->ioc_inllen2 < sizeof(uuid)) {
3853                         obd_ioctl_freedata(buf, len);
3854                         GOTO(out, err = -EINVAL);
3855                 }
3856
3857                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3858                 desc->ld_tgt_count = 1;
3859                 desc->ld_active_tgt_count = 1;
3860                 desc->ld_default_stripe_count = 1;
3861                 desc->ld_default_stripe_size = 0;
3862                 desc->ld_default_stripe_offset = 0;
3863                 desc->ld_pattern = 0;
3864                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3865
3866                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3867
3868                 err = cfs_copy_to_user((void *)uarg, buf, len);
3869                 if (err)
3870                         err = -EFAULT;
3871                 obd_ioctl_freedata(buf, len);
3872                 GOTO(out, err);
3873         }
3874         case LL_IOC_LOV_SETSTRIPE:
3875                 err = obd_alloc_memmd(exp, karg);
3876                 if (err > 0)
3877                         err = 0;
3878                 GOTO(out, err);
3879         case LL_IOC_LOV_GETSTRIPE:
3880                 err = osc_getstripe(karg, uarg);
3881                 GOTO(out, err);
3882         case OBD_IOC_CLIENT_RECOVER:
3883                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3884                                             data->ioc_inlbuf1);
3885                 if (err > 0)
3886                         err = 0;
3887                 GOTO(out, err);
3888         case IOC_OSC_SET_ACTIVE:
3889                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3890                                                data->ioc_offset);
3891                 GOTO(out, err);
3892         case OBD_IOC_POLL_QUOTACHECK:
3893                 err = lquota_poll_check(quota_interface, exp,
3894                                         (struct if_quotacheck *)karg);
3895                 GOTO(out, err);
3896         case OBD_IOC_PING_TARGET:
3897                 err = ptlrpc_obd_ping(obd);
3898                 GOTO(out, err);
3899         default:
3900                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3901                        cmd, cfs_curproc_comm());
3902                 GOTO(out, err = -ENOTTY);
3903         }
3904 out:
3905         cfs_module_put(THIS_MODULE);
3906         return err;
3907 }
3908
3909 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3910                         void *key, __u32 *vallen, void *val,
3911                         struct lov_stripe_md *lsm)
3912 {
3913         ENTRY;
3914         if (!vallen || !val)
3915                 RETURN(-EFAULT);
3916
3917         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3918                 __u32 *stripe = val;
3919                 *vallen = sizeof(*stripe);
3920                 *stripe = 0;
3921                 RETURN(0);
3922         } else if (KEY_IS(KEY_LAST_ID)) {
3923                 struct ptlrpc_request *req;
3924                 obd_id                *reply;
3925                 char                  *tmp;
3926                 int                    rc;
3927
3928                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3929                                            &RQF_OST_GET_INFO_LAST_ID);
3930                 if (req == NULL)
3931                         RETURN(-ENOMEM);
3932
3933                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3934                                      RCL_CLIENT, keylen);
3935                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3936                 if (rc) {
3937                         ptlrpc_request_free(req);
3938                         RETURN(rc);
3939                 }
3940
3941                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3942                 memcpy(tmp, key, keylen);
3943
3944                 req->rq_no_delay = req->rq_no_resend = 1;
3945                 ptlrpc_request_set_replen(req);
3946                 rc = ptlrpc_queue_wait(req);
3947                 if (rc)
3948                         GOTO(out, rc);
3949
3950                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3951                 if (reply == NULL)
3952                         GOTO(out, rc = -EPROTO);
3953
3954                 *((obd_id *)val) = *reply;
3955         out:
3956                 ptlrpc_req_finished(req);
3957                 RETURN(rc);
3958         } else if (KEY_IS(KEY_FIEMAP)) {
3959                 struct ptlrpc_request *req;
3960                 struct ll_user_fiemap *reply;
3961                 char *tmp;
3962                 int rc;
3963
3964                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3965                                            &RQF_OST_GET_INFO_FIEMAP);
3966                 if (req == NULL)
3967                         RETURN(-ENOMEM);
3968
3969                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3970                                      RCL_CLIENT, keylen);
3971                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3972                                      RCL_CLIENT, *vallen);
3973                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3974                                      RCL_SERVER, *vallen);
3975
3976                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3977                 if (rc) {
3978                         ptlrpc_request_free(req);
3979                         RETURN(rc);
3980                 }
3981
3982                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3983                 memcpy(tmp, key, keylen);
3984                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3985                 memcpy(tmp, val, *vallen);
3986
3987                 ptlrpc_request_set_replen(req);
3988                 rc = ptlrpc_queue_wait(req);
3989                 if (rc)
3990                         GOTO(out1, rc);
3991
3992                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3993                 if (reply == NULL)
3994                         GOTO(out1, rc = -EPROTO);
3995
3996                 memcpy(val, reply, *vallen);
3997         out1:
3998                 ptlrpc_req_finished(req);
3999
4000                 RETURN(rc);
4001         }
4002
4003         RETURN(-EINVAL);
4004 }
4005
4006 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4007 {
4008         struct llog_ctxt *ctxt;
4009         int rc = 0;
4010         ENTRY;
4011
4012         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4013         if (ctxt) {
4014                 rc = llog_initiator_connect(ctxt);
4015                 llog_ctxt_put(ctxt);
4016         } else {
4017                 /* XXX return an error? skip setting below flags? */
4018         }
4019
4020         cfs_spin_lock(&imp->imp_lock);
4021         imp->imp_server_timeout = 1;
4022         imp->imp_pingable = 1;
4023         cfs_spin_unlock(&imp->imp_lock);
4024         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4025
4026         RETURN(rc);
4027 }
4028
4029 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4030                                           struct ptlrpc_request *req,
4031                                           void *aa, int rc)
4032 {
4033         ENTRY;
4034         if (rc != 0)
4035                 RETURN(rc);
4036
4037         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4038 }
4039
4040 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4041                               void *key, obd_count vallen, void *val,
4042                               struct ptlrpc_request_set *set)
4043 {
4044         struct ptlrpc_request *req;
4045         struct obd_device     *obd = exp->exp_obd;
4046         struct obd_import     *imp = class_exp2cliimp(exp);
4047         char                  *tmp;
4048         int                    rc;
4049         ENTRY;
4050
4051         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4052
4053         if (KEY_IS(KEY_NEXT_ID)) {
4054                 obd_id new_val;
4055                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4056
4057                 if (vallen != sizeof(obd_id))
4058                         RETURN(-ERANGE);
4059                 if (val == NULL)
4060                         RETURN(-EINVAL);
4061
4062                 if (vallen != sizeof(obd_id))
4063                         RETURN(-EINVAL);
4064
4065                 /* avoid race between allocate new object and set next id
4066                  * from ll_sync thread */
4067                 cfs_spin_lock(&oscc->oscc_lock);
4068                 new_val = *((obd_id*)val) + 1;
4069                 if (new_val > oscc->oscc_next_id)
4070                         oscc->oscc_next_id = new_val;
4071                 cfs_spin_unlock(&oscc->oscc_lock);
4072                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4073                        exp->exp_obd->obd_name,
4074                        obd->u.cli.cl_oscc.oscc_next_id);
4075
4076                 RETURN(0);
4077         }
4078
4079         if (KEY_IS(KEY_CHECKSUM)) {
4080                 if (vallen != sizeof(int))
4081                         RETURN(-EINVAL);
4082                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4083                 RETURN(0);
4084         }
4085
4086         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4087                 sptlrpc_conf_client_adapt(obd);
4088                 RETURN(0);
4089         }
4090
4091         if (KEY_IS(KEY_FLUSH_CTX)) {
4092                 sptlrpc_import_flush_my_ctx(imp);
4093                 RETURN(0);
4094         }
4095
4096         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4097                 RETURN(-EINVAL);
4098
4099         /* We pass all other commands directly to OST. Since nobody calls osc
4100            methods directly and everybody is supposed to go through LOV, we
4101            assume lov checked invalid values for us.
4102            The only recognised values so far are evict_by_nid and mds_conn.
4103            Even if something bad goes through, we'd get a -EINVAL from OST
4104            anyway. */
4105
4106         if (KEY_IS(KEY_GRANT_SHRINK))
4107                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4108         else
4109                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4110
4111         if (req == NULL)
4112                 RETURN(-ENOMEM);
4113
4114         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4115                              RCL_CLIENT, keylen);
4116         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4117                              RCL_CLIENT, vallen);
4118         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4119         if (rc) {
4120                 ptlrpc_request_free(req);
4121                 RETURN(rc);
4122         }
4123
4124         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4125         memcpy(tmp, key, keylen);
4126         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4127         memcpy(tmp, val, vallen);
4128
4129         if (KEY_IS(KEY_MDS_CONN)) {
4130                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4131
4132                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4133                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4134                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4135                 req->rq_no_delay = req->rq_no_resend = 1;
4136                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4137         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4138                 struct osc_grant_args *aa;
4139                 struct obdo *oa;
4140
4141                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4142                 aa = ptlrpc_req_async_args(req);
4143                 OBDO_ALLOC(oa);
4144                 if (!oa) {
4145                         ptlrpc_req_finished(req);
4146                         RETURN(-ENOMEM);
4147                 }
4148                 *oa = ((struct ost_body *)val)->oa;
4149                 aa->aa_oa = oa;
4150                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4151         }
4152
4153         ptlrpc_request_set_replen(req);
4154         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4155                 LASSERT(set != NULL);
4156                 ptlrpc_set_add_req(set, req);
4157                 ptlrpc_check_set(NULL, set);
4158         } else
4159                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4160
4161         RETURN(0);
4162 }
4163
4164
4165 static struct llog_operations osc_size_repl_logops = {
4166         lop_cancel: llog_obd_repl_cancel
4167 };
4168
4169 static struct llog_operations osc_mds_ost_orig_logops;
4170
4171 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4172                            struct obd_device *tgt, struct llog_catid *catid)
4173 {
4174         int rc;
4175         ENTRY;
4176
4177         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4178                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4179         if (rc) {
4180                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4181                 GOTO(out, rc);
4182         }
4183
4184         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4185                         NULL, &osc_size_repl_logops);
4186         if (rc) {
4187                 struct llog_ctxt *ctxt =
4188                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4189                 if (ctxt)
4190                         llog_cleanup(ctxt);
4191                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4192         }
4193         GOTO(out, rc);
4194 out:
4195         if (rc) {
4196                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4197                        obd->obd_name, tgt->obd_name, catid, rc);
4198                 CERROR("logid "LPX64":0x%x\n",
4199                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4200         }
4201         return rc;
4202 }
4203
4204 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4205                          struct obd_device *disk_obd, int *index)
4206 {
4207         struct llog_catid catid;
4208         static char name[32] = CATLIST;
4209         int rc;
4210         ENTRY;
4211
4212         LASSERT(olg == &obd->obd_olg);
4213
4214         cfs_mutex_down(&olg->olg_cat_processing);
4215         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4216         if (rc) {
4217                 CERROR("rc: %d\n", rc);
4218                 GOTO(out, rc);
4219         }
4220
4221         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4222                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4223                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4224
4225         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4226         if (rc) {
4227                 CERROR("rc: %d\n", rc);
4228                 GOTO(out, rc);
4229         }
4230
4231         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4232         if (rc) {
4233                 CERROR("rc: %d\n", rc);
4234                 GOTO(out, rc);
4235         }
4236
4237  out:
4238         cfs_mutex_up(&olg->olg_cat_processing);
4239
4240         return rc;
4241 }
4242
4243 static int osc_llog_finish(struct obd_device *obd, int count)
4244 {
4245         struct llog_ctxt *ctxt;
4246         int rc = 0, rc2 = 0;
4247         ENTRY;
4248
4249         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4250         if (ctxt)
4251                 rc = llog_cleanup(ctxt);
4252
4253         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4254         if (ctxt)
4255                 rc2 = llog_cleanup(ctxt);
4256         if (!rc)
4257                 rc = rc2;
4258
4259         RETURN(rc);
4260 }
4261
4262 static int osc_reconnect(const struct lu_env *env,
4263                          struct obd_export *exp, struct obd_device *obd,
4264                          struct obd_uuid *cluuid,
4265                          struct obd_connect_data *data,
4266                          void *localdata)
4267 {
4268         struct client_obd *cli = &obd->u.cli;
4269
4270         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4271                 long lost_grant;
4272
4273                 client_obd_list_lock(&cli->cl_loi_list_lock);
4274                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4275                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4276                 lost_grant = cli->cl_lost_grant;
4277                 cli->cl_lost_grant = 0;
4278                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4279
4280                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4281                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4282                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4283                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4284                        " ocd_grant: %d\n", data->ocd_connect_flags,
4285                        data->ocd_version, data->ocd_grant);
4286         }
4287
4288         RETURN(0);
4289 }
4290
4291 static int osc_disconnect(struct obd_export *exp)
4292 {
4293         struct obd_device *obd = class_exp2obd(exp);
4294         struct llog_ctxt  *ctxt;
4295         int rc;
4296
4297         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4298         if (ctxt) {
4299                 if (obd->u.cli.cl_conn_count == 1) {
4300                         /* Flush any remaining cancel messages out to the
4301                          * target */
4302                         llog_sync(ctxt, exp);
4303                 }
4304                 llog_ctxt_put(ctxt);
4305         } else {
4306                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4307                        obd);
4308         }
4309
4310         rc = client_disconnect_export(exp);
4311         /**
4312          * Initially we put del_shrink_grant before disconnect_export, but it
4313          * causes the following problem if setup (connect) and cleanup
4314          * (disconnect) are tangled together.
4315          *      connect p1                     disconnect p2
4316          *   ptlrpc_connect_import
4317          *     ...............               class_manual_cleanup
4318          *                                     osc_disconnect
4319          *                                     del_shrink_grant
4320          *   ptlrpc_connect_interrupt
4321          *     init_grant_shrink
4322          *   add this client to shrink list
4323          *                                      cleanup_osc
4324          * Bang! pinger trigger the shrink.
4325          * So the osc should be disconnected from the shrink list, after we
4326          * are sure the import has been destroyed. BUG18662
4327          */
4328         if (obd->u.cli.cl_import == NULL)
4329                 osc_del_shrink_grant(&obd->u.cli);
4330         return rc;
4331 }
4332
4333 static int osc_import_event(struct obd_device *obd,
4334                             struct obd_import *imp,
4335                             enum obd_import_event event)
4336 {
4337         struct client_obd *cli;
4338         int rc = 0;
4339
4340         ENTRY;
4341         LASSERT(imp->imp_obd == obd);
4342
4343         switch (event) {
4344         case IMP_EVENT_DISCON: {
4345                 /* Only do this on the MDS OSC's */
4346                 if (imp->imp_server_timeout) {
4347                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4348
4349                         cfs_spin_lock(&oscc->oscc_lock);
4350                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4351                         cfs_spin_unlock(&oscc->oscc_lock);
4352                 }
4353                 cli = &obd->u.cli;
4354                 client_obd_list_lock(&cli->cl_loi_list_lock);
4355                 cli->cl_avail_grant = 0;
4356                 cli->cl_lost_grant = 0;
4357                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4358                 break;
4359         }
4360         case IMP_EVENT_INACTIVE: {
4361                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4362                 break;
4363         }
4364         case IMP_EVENT_INVALIDATE: {
4365                 struct ldlm_namespace *ns = obd->obd_namespace;
4366                 struct lu_env         *env;
4367                 int                    refcheck;
4368
4369                 env = cl_env_get(&refcheck);
4370                 if (!IS_ERR(env)) {
4371                         /* Reset grants */
4372                         cli = &obd->u.cli;
4373                         client_obd_list_lock(&cli->cl_loi_list_lock);
4374                         /* all pages go to failing rpcs due to the invalid
4375                          * import */
4376                         osc_check_rpcs(env, cli);
4377                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4378
4379                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4380                         cl_env_put(env, &refcheck);
4381                 } else
4382                         rc = PTR_ERR(env);
4383                 break;
4384         }
4385         case IMP_EVENT_ACTIVE: {
4386                 /* Only do this on the MDS OSC's */
4387                 if (imp->imp_server_timeout) {
4388                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4389
4390                         cfs_spin_lock(&oscc->oscc_lock);
4391                         oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4392                                               OSCC_FLAG_NOSPC_BLK);
4393                         cfs_spin_unlock(&oscc->oscc_lock);
4394                 }
4395                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4396                 break;
4397         }
4398         case IMP_EVENT_OCD: {
4399                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4400
4401                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4402                         osc_init_grant(&obd->u.cli, ocd);
4403
4404                 /* See bug 7198 */
4405                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4406                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4407
4408                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4409                 break;
4410         }
4411         case IMP_EVENT_DEACTIVATE: {
4412                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4413                 break;
4414         }
4415         case IMP_EVENT_ACTIVATE: {
4416                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4417                 break;
4418         }
4419         default:
4420                 CERROR("Unknown import event %d\n", event);
4421                 LBUG();
4422         }
4423         RETURN(rc);
4424 }
4425
4426 /**
4427  * Determine whether the lock can be canceled before replaying the lock
4428  * during recovery, see bug16774 for detailed information.
4429  *
4430  * \retval zero the lock can't be canceled
4431  * \retval other ok to cancel
4432  */
4433 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4434 {
4435         check_res_locked(lock->l_resource);
4436
4437         /*
4438          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4439          *
4440          * XXX as a future improvement, we can also cancel unused write lock
4441          * if it doesn't have dirty data and active mmaps.
4442          */
4443         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4444             (lock->l_granted_mode == LCK_PR ||
4445              lock->l_granted_mode == LCK_CR) &&
4446             (osc_dlm_lock_pageref(lock) == 0))
4447                 RETURN(1);
4448
4449         RETURN(0);
4450 }
4451
4452 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4453 {
4454         int rc;
4455         ENTRY;
4456
4457         ENTRY;
4458         rc = ptlrpcd_addref();
4459         if (rc)
4460                 RETURN(rc);
4461
4462         rc = client_obd_setup(obd, lcfg);
4463         if (rc) {
4464                 ptlrpcd_decref();
4465         } else {
4466                 struct lprocfs_static_vars lvars = { 0 };
4467                 struct client_obd *cli = &obd->u.cli;
4468
4469                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4470                 lprocfs_osc_init_vars(&lvars);
4471                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4472                         lproc_osc_attach_seqstat(obd);
4473                         sptlrpc_lprocfs_cliobd_attach(obd);
4474                         ptlrpc_lprocfs_register_obd(obd);
4475                 }
4476
4477                 oscc_init(obd);
4478                 /* We need to allocate a few requests more, because
4479                    brw_interpret tries to create new requests before freeing
4480                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4481                    reserved, but I afraid that might be too much wasted RAM
4482                    in fact, so 2 is just my guess and still should work. */
4483                 cli->cl_import->imp_rq_pool =
4484                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4485                                             OST_MAXREQSIZE,
4486                                             ptlrpc_add_rqs_to_pool);
4487
4488                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4489                 cfs_sema_init(&cli->cl_grant_sem, 1);
4490
4491                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4492         }
4493
4494         RETURN(rc);
4495 }
4496
4497 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4498 {
4499         int rc = 0;
4500         ENTRY;
4501
4502         switch (stage) {
4503         case OBD_CLEANUP_EARLY: {
4504                 struct obd_import *imp;
4505                 imp = obd->u.cli.cl_import;
4506                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4507                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4508                 ptlrpc_deactivate_import(imp);
4509                 cfs_spin_lock(&imp->imp_lock);
4510                 imp->imp_pingable = 0;
4511                 cfs_spin_unlock(&imp->imp_lock);
4512                 break;
4513         }
4514         case OBD_CLEANUP_EXPORTS: {
4515                 /* LU-464
4516                  * for echo client, export may be on zombie list, wait for
4517                  * zombie thread to cull it, because cli.cl_import will be
4518                  * cleared in client_disconnect_export():
4519                  *   class_export_destroy() -> obd_cleanup() ->
4520                  *   echo_device_free() -> echo_client_cleanup() ->
4521                  *   obd_disconnect() -> osc_disconnect() ->
4522                  *   client_disconnect_export()
4523                  */
4524                 obd_zombie_barrier();
4525                 obd_cleanup_client_import(obd);
4526                 rc = obd_llog_finish(obd, 0);
4527                 if (rc != 0)
4528                         CERROR("failed to cleanup llogging subsystems\n");
4529                 break;
4530                 }
4531         }
4532         RETURN(rc);
4533 }
4534
4535 int osc_cleanup(struct obd_device *obd)
4536 {
4537         int rc;
4538
4539         ENTRY;
4540         ptlrpc_lprocfs_unregister_obd(obd);
4541         lprocfs_obd_cleanup(obd);
4542
4543         /* free memory of osc quota cache */
4544         lquota_cleanup(quota_interface, obd);
4545
4546         rc = client_obd_cleanup(obd);
4547
4548         ptlrpcd_decref();
4549         RETURN(rc);
4550 }
4551
4552 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4553 {
4554         struct lprocfs_static_vars lvars = { 0 };
4555         int rc = 0;
4556
4557         lprocfs_osc_init_vars(&lvars);
4558
4559         switch (lcfg->lcfg_command) {
4560         default:
4561                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4562                                               lcfg, obd);
4563                 if (rc > 0)
4564                         rc = 0;
4565                 break;
4566         }
4567
4568         return(rc);
4569 }
4570
4571 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4572 {
4573         return osc_process_config_base(obd, buf);
4574 }
4575
4576 struct obd_ops osc_obd_ops = {
4577         .o_owner                = THIS_MODULE,
4578         .o_setup                = osc_setup,
4579         .o_precleanup           = osc_precleanup,
4580         .o_cleanup              = osc_cleanup,
4581         .o_add_conn             = client_import_add_conn,
4582         .o_del_conn             = client_import_del_conn,
4583         .o_connect              = client_connect_import,
4584         .o_reconnect            = osc_reconnect,
4585         .o_disconnect           = osc_disconnect,
4586         .o_statfs               = osc_statfs,
4587         .o_statfs_async         = osc_statfs_async,
4588         .o_packmd               = osc_packmd,
4589         .o_unpackmd             = osc_unpackmd,
4590         .o_precreate            = osc_precreate,
4591         .o_create               = osc_create,
4592         .o_create_async         = osc_create_async,
4593         .o_destroy              = osc_destroy,
4594         .o_getattr              = osc_getattr,
4595         .o_getattr_async        = osc_getattr_async,
4596         .o_setattr              = osc_setattr,
4597         .o_setattr_async        = osc_setattr_async,
4598         .o_brw                  = osc_brw,
4599         .o_punch                = osc_punch,
4600         .o_sync                 = osc_sync,
4601         .o_enqueue              = osc_enqueue,
4602         .o_change_cbdata        = osc_change_cbdata,
4603         .o_find_cbdata          = osc_find_cbdata,
4604         .o_cancel               = osc_cancel,
4605         .o_cancel_unused        = osc_cancel_unused,
4606         .o_iocontrol            = osc_iocontrol,
4607         .o_get_info             = osc_get_info,
4608         .o_set_info_async       = osc_set_info_async,
4609         .o_import_event         = osc_import_event,
4610         .o_llog_init            = osc_llog_init,
4611         .o_llog_finish          = osc_llog_finish,
4612         .o_process_config       = osc_process_config,
4613 };
4614
4615 extern struct lu_kmem_descr osc_caches[];
4616 extern cfs_spinlock_t       osc_ast_guard;
4617 extern cfs_lock_class_key_t osc_ast_guard_class;
4618
4619 int __init osc_init(void)
4620 {
4621         struct lprocfs_static_vars lvars = { 0 };
4622         int rc;
4623         ENTRY;
4624
4625         /* print an address of _any_ initialized kernel symbol from this
4626          * module, to allow debugging with gdb that doesn't support data
4627          * symbols from modules.*/
4628         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4629
4630         rc = lu_kmem_init(osc_caches);
4631
4632         lprocfs_osc_init_vars(&lvars);
4633
4634         cfs_request_module("lquota");
4635         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4636         lquota_init(quota_interface);
4637         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4638
4639         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4640                                  LUSTRE_OSC_NAME, &osc_device_type);
4641         if (rc) {
4642                 if (quota_interface)
4643                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4644                 lu_kmem_fini(osc_caches);
4645                 RETURN(rc);
4646         }
4647
4648         cfs_spin_lock_init(&osc_ast_guard);
4649         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4650
4651         osc_mds_ost_orig_logops = llog_lvfs_ops;
4652         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4653         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4654         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4655         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4656
4657         RETURN(rc);
4658 }
4659
4660 #ifdef __KERNEL__
4661 static void /*__exit*/ osc_exit(void)
4662 {
4663         lu_device_type_fini(&osc_device_type);
4664
4665         lquota_exit(quota_interface);
4666         if (quota_interface)
4667                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4668
4669         class_unregister_type(LUSTRE_OSC_NAME);
4670         lu_kmem_fini(osc_caches);
4671 }
4672
4673 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4674 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4675 MODULE_LICENSE("GPL");
4676
4677 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4678 #endif