Whamcloud - gitweb
LU-9679 lustre: use LIST_HEAD() for local lists.
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, 2017, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  *
29  * 1. OSP (Object Storage Proxy) transaction methods
30  *
31  * Implement OSP layer transaction related interfaces for the dt_device API
32  * dt_device_operations.
33  *
34  *
35  * 2. Handle asynchronous idempotent operations
36  *
37  * The OSP uses OUT (Object Unified Target) RPC to talk with other server
38  * (MDT or OST) for kinds of operations, such as create, unlink, insert,
39  * delete, lookup, set_(x)attr, get_(x)attr, and etc. To reduce the number
40  * of RPCs, we allow multiple operations to be packaged together in single
41  * OUT RPC.
42  *
43  * For the asynchronous idempotent operations, such as get_(x)attr, related
44  * RPCs will be inserted into an osp_device based shared asynchronous request
45  * queue - osp_device::opd_async_requests. When the queue is full, all the
46  * requests in the queue will be packaged into a single OUT RPC and given to
47  * the ptlrpcd daemon (for sending), then the queue is purged and other new
48  * requests can be inserted into it.
49  *
50  * When the asynchronous idempotent operation inserts the request into the
51  * shared queue, it will register an interpreter. When the packaged OUT RPC
52  * is replied (or failed to be sent out), all the registered interpreters
53  * will be called one by one to handle each own result.
54  *
55  *
56  * There are three kinds of transactions
57  *
58  * 1. Local transaction, all of updates of the transaction are in the local MDT.
59  * 2. Remote transaction, all of updates of the transaction are in one remote
60  * MDT, which only happens in LFSCK now.
61  * 3. Distribute transaction, updates for the transaction are in mulitple MDTs.
62  *
63  * Author: Di Wang <di.wang@intel.com>
64  * Author: Fan, Yong <fan.yong@intel.com>
65  */
66
67 #define DEBUG_SUBSYSTEM S_MDS
68
69 #include <lustre_net.h>
70 #include "osp_internal.h"
71
72 /**
73  * The argument for the interpreter callback of osp request.
74  */
75 struct osp_update_args {
76         struct osp_update_request *oaua_update;
77         atomic_t                 *oaua_count;
78         wait_queue_head_t        *oaua_waitq;
79         bool                      oaua_flow_control;
80         const struct lu_env      *oaua_update_env;
81 };
82
83 /**
84  * Call back for each update request.
85  */
86 struct osp_update_callback {
87         /* list in the osp_update_request::our_cb_items */
88         struct list_head                 ouc_list;
89
90         /* The target of the async update request. */
91         struct osp_object               *ouc_obj;
92
93         /* The data used by or_interpreter. */
94         void                            *ouc_data;
95
96         /* The interpreter function called after the async request handled. */
97         osp_update_interpreter_t        ouc_interpreter;
98 };
99
100 /**
101  * Allocate new update request
102  *
103  * Allocate new update request and insert it to the req_update_list.
104  *
105  * \param [in] our      osp_udate_request where to create a new
106  *                      update request
107  *
108  * \retval      0 if creation succeeds.
109  * \retval      negative errno if creation fails.
110  */
111 int osp_object_update_request_create(struct osp_update_request *our,
112                                      size_t size)
113 {
114         struct osp_update_request_sub *ours;
115         struct object_update_request *ourq;
116
117         OBD_ALLOC_PTR(ours);
118         if (ours == NULL)
119                 return -ENOMEM;
120
121         /* The object update request will be added to an SG list for
122          * bulk transfer. Some IB HW cannot handle partial pages in SG
123          * lists (since they create gaps in memory regions) so we
124          * round the size up to the next multiple of PAGE_SIZE. See
125          * LU-9983. */
126         LASSERT(size > 0);
127         size = round_up(size, PAGE_SIZE);
128         OBD_ALLOC_LARGE(ourq, size);
129         if (ourq == NULL) {
130                 OBD_FREE_PTR(ours);
131                 return -ENOMEM;
132         }
133
134         ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
135         ourq->ourq_count = 0;
136         ours->ours_req = ourq;
137         ours->ours_req_size = size;
138         INIT_LIST_HEAD(&ours->ours_list);
139         list_add_tail(&ours->ours_list, &our->our_req_list);
140         our->our_req_nr++;
141
142         return 0;
143 }
144
145 /**
146  * Get current update request
147  *
148  * Get current object update request from our_req_list in
149  * osp_update_request, because we always insert the new update
150  * request in the last position, so the last update request
151  * in the list will be the current update req.
152  *
153  * \param[in] our       osp update request where to get the
154  *                      current object update.
155  *
156  * \retval              the current object update.
157  **/
158 struct osp_update_request_sub *
159 osp_current_object_update_request(struct osp_update_request *our)
160 {
161         if (list_empty(&our->our_req_list))
162                 return NULL;
163
164         return list_entry(our->our_req_list.prev, struct osp_update_request_sub,
165                           ours_list);
166 }
167
168 /**
169  * Allocate and initialize osp_update_request
170  *
171  * osp_update_request is being used to track updates being executed on
172  * this dt_device(OSD or OSP). The update buffer will be 4k initially,
173  * and increased if needed.
174  *
175  * \param [in] dt       dt device
176  *
177  * \retval              osp_update_request being allocated if succeed
178  * \retval              ERR_PTR(errno) if failed
179  */
180 struct osp_update_request *osp_update_request_create(struct dt_device *dt)
181 {
182         struct osp_update_request *our;
183         int rc;
184
185         OBD_ALLOC_PTR(our);
186         if (our == NULL)
187                 return ERR_PTR(-ENOMEM);
188
189         INIT_LIST_HEAD(&our->our_req_list);
190         INIT_LIST_HEAD(&our->our_cb_items);
191         INIT_LIST_HEAD(&our->our_list);
192         INIT_LIST_HEAD(&our->our_invalidate_cb_list);
193         spin_lock_init(&our->our_list_lock);
194
195         rc = osp_object_update_request_create(our, PAGE_SIZE);
196         if (rc != 0) {
197                 OBD_FREE_PTR(our);
198                 return ERR_PTR(rc);
199         }
200         return our;
201 }
202
203 void osp_update_request_destroy(const struct lu_env *env,
204                                 struct osp_update_request *our)
205 {
206         struct osp_update_request_sub *ours;
207         struct osp_update_request_sub *tmp;
208
209         if (our == NULL)
210                 return;
211
212         list_for_each_entry_safe(ours, tmp, &our->our_req_list, ours_list) {
213                 list_del(&ours->ours_list);
214                 if (ours->ours_req != NULL)
215                         OBD_FREE_LARGE(ours->ours_req, ours->ours_req_size);
216                 OBD_FREE_PTR(ours);
217         }
218
219         if (!list_empty(&our->our_invalidate_cb_list)) {
220                 struct lu_env lenv;
221                 struct osp_object *obj;
222                 struct osp_object *next;
223
224                 if (env == NULL) {
225                         lu_env_init(&lenv, LCT_MD_THREAD | LCT_DT_THREAD);
226                         env = &lenv;
227                 }
228
229                 list_for_each_entry_safe(obj, next,
230                                          &our->our_invalidate_cb_list,
231                                          opo_invalidate_cb_list) {
232                         spin_lock(&obj->opo_lock);
233                         list_del_init(&obj->opo_invalidate_cb_list);
234                         spin_unlock(&obj->opo_lock);
235
236                         dt_object_put(env, &obj->opo_obj);
237                 }
238
239                 if (env == &lenv)
240                         lu_env_fini(&lenv);
241         }
242
243         OBD_FREE_PTR(our);
244 }
245
246 static void
247 object_update_request_dump(const struct object_update_request *ourq,
248                            unsigned int mask)
249 {
250         unsigned int i;
251         size_t total_size = 0;
252
253         for (i = 0; i < ourq->ourq_count; i++) {
254                 struct object_update    *update;
255                 size_t                  size = 0;
256
257                 update = object_update_request_get(ourq, i, &size);
258                 LASSERT(update != NULL);
259                 CDEBUG(mask, "i = %u fid = "DFID" op = %s "
260                        "params = %d batchid = %llu size = %zu repsize %u\n",
261                        i, PFID(&update->ou_fid),
262                        update_op_str(update->ou_type),
263                        update->ou_params_count,
264                        update->ou_batchid, size,
265                        (unsigned)update->ou_result_size);
266
267                 total_size += size;
268         }
269
270         CDEBUG(mask, "updates = %p magic = %x count = %d size = %zu\n", ourq,
271                ourq->ourq_magic, ourq->ourq_count, total_size);
272 }
273
274 /**
275  * Prepare inline update request
276  *
277  * Prepare OUT update ptlrpc inline request, and the request usually includes
278  * one update buffer, which does not need bulk transfer.
279  *
280  * \param[in] env       execution environment
281  * \param[in] req       ptlrpc request
282  * \param[in] ours      sub osp_update_request to be packed
283  *
284  * \retval              0 if packing succeeds
285  * \retval              negative errno if packing fails
286  */
287 int osp_prep_inline_update_req(const struct lu_env *env,
288                                struct ptlrpc_request *req,
289                                struct osp_update_request *our,
290                                int repsize)
291 {
292         struct osp_update_request_sub *ours;
293         struct out_update_header *ouh;
294         __u32 update_req_size;
295         int rc;
296
297         ours = list_entry(our->our_req_list.next,
298                           struct osp_update_request_sub, ours_list);
299         update_req_size = object_update_request_size(ours->ours_req);
300         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
301                              update_req_size + sizeof(*ouh));
302
303         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
304         if (rc != 0)
305                 RETURN(rc);
306
307         ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
308         ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
309         ouh->ouh_count = 1;
310         ouh->ouh_inline_length = update_req_size;
311         ouh->ouh_reply_size = repsize;
312
313         memcpy(ouh->ouh_inline_data, ours->ours_req, update_req_size);
314
315         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
316                              RCL_SERVER, repsize);
317
318         ptlrpc_request_set_replen(req);
319         req->rq_request_portal = OUT_PORTAL;
320         req->rq_reply_portal = OSC_REPLY_PORTAL;
321
322         RETURN(rc);
323 }
324
325 /**
326  * Prepare update request.
327  *
328  * Prepare OUT update ptlrpc request, and the request usually includes
329  * all of updates (stored in \param ureq) from one operation.
330  *
331  * \param[in] env       execution environment
332  * \param[in] imp       import on which ptlrpc request will be sent
333  * \param[in] ureq      hold all of updates which will be packed into the req
334  * \param[in] reqp      request to be created
335  *
336  * \retval              0 if preparation succeeds.
337  * \retval              negative errno if preparation fails.
338  */
339 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
340                         struct osp_update_request *our,
341                         struct ptlrpc_request **reqp)
342 {
343         struct ptlrpc_request           *req;
344         struct ptlrpc_bulk_desc         *desc;
345         struct osp_update_request_sub   *ours;
346         const struct object_update_request *ourq;
347         struct out_update_header        *ouh;
348         struct out_update_buffer        *oub;
349         __u32                           buf_count = 0;
350         int                             repsize = 0;
351         struct object_update_reply      *reply;
352         int                             rc, i;
353         int                             total = 0;
354         ENTRY;
355
356         list_for_each_entry(ours, &our->our_req_list, ours_list) {
357                 object_update_request_dump(ours->ours_req, D_INFO);
358
359                 ourq = ours->ours_req;
360                 for (i = 0; i < ourq->ourq_count; i++) {
361                         struct object_update    *update;
362                         size_t                  size = 0;
363
364
365                         /* XXX: it's very inefficient to lookup update
366                          *      this way, iterating from the beginning
367                          *      each time */
368                         update = object_update_request_get(ourq, i, &size);
369                         LASSERT(update != NULL);
370
371                         repsize += sizeof(reply->ourp_lens[0]);
372                         repsize += sizeof(struct object_update_result);
373                         repsize += update->ou_result_size;
374                 }
375
376                 buf_count++;
377         }
378         repsize += sizeof(*reply);
379         if (repsize < OUT_UPDATE_REPLY_SIZE)
380                 repsize = OUT_UPDATE_REPLY_SIZE;
381         LASSERT(buf_count > 0);
382
383         req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
384         if (req == NULL)
385                 RETURN(-ENOMEM);
386
387         if (buf_count == 1) {
388                 ours = list_entry(our->our_req_list.next,
389                                   struct osp_update_request_sub, ours_list);
390
391                 /* Let's check if it can be packed inline */
392                 if (object_update_request_size(ours->ours_req) +
393                     sizeof(struct out_update_header) <
394                                 OUT_UPDATE_MAX_INLINE_SIZE) {
395                         rc = osp_prep_inline_update_req(env, req, our, repsize);
396                         if (rc == 0)
397                                 *reqp = req;
398                         GOTO(out_req, rc);
399                 }
400         }
401
402         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
403                              sizeof(struct osp_update_request));
404
405         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_BUF, RCL_CLIENT,
406                              buf_count * sizeof(*oub));
407
408         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
409         if (rc != 0)
410                 GOTO(out_req, rc);
411
412         ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
413         ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
414         ouh->ouh_count = buf_count;
415         ouh->ouh_inline_length = 0;
416         ouh->ouh_reply_size = repsize;
417         oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF);
418         list_for_each_entry(ours, &our->our_req_list, ours_list) {
419                 oub->oub_size = ours->ours_req_size;
420                 oub++;
421         }
422
423         req->rq_bulk_write = 1;
424         desc = ptlrpc_prep_bulk_imp(req, buf_count,
425                 MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
426                 PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KVEC,
427                 MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
428         if (desc == NULL)
429                 GOTO(out_req, rc = -ENOMEM);
430
431         /* NB req now owns desc and will free it when it gets freed */
432         list_for_each_entry(ours, &our->our_req_list, ours_list) {
433                 desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req,
434                                                 ours->ours_req_size);
435                 total += ours->ours_req_size;
436         }
437         CDEBUG(D_OTHER, "total %d in %u\n", total, our->our_update_nr);
438
439         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
440                              RCL_SERVER, repsize);
441
442         ptlrpc_request_set_replen(req);
443         req->rq_request_portal = OUT_PORTAL;
444         req->rq_reply_portal = OSC_REPLY_PORTAL;
445         *reqp = req;
446
447 out_req:
448         if (rc < 0)
449                 ptlrpc_req_finished(req);
450
451         RETURN(rc);
452 }
453
454 /**
455  * Send update RPC.
456  *
457  * Send update request to the remote MDT synchronously.
458  *
459  * \param[in] env       execution environment
460  * \param[in] imp       import on which ptlrpc request will be sent
461  * \param[in] our       hold all of updates which will be packed into the req
462  * \param[in] reqp      request to be created
463  *
464  * \retval              0 if RPC succeeds.
465  * \retval              negative errno if RPC fails.
466  */
467 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
468                     struct osp_update_request *our,
469                     struct ptlrpc_request **reqp)
470 {
471         struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
472         struct ptlrpc_request   *req = NULL;
473         int                     rc;
474         ENTRY;
475
476         rc = osp_prep_update_req(env, imp, our, &req);
477         if (rc != 0)
478                 RETURN(rc);
479
480         osp_set_req_replay(osp, req);
481         req->rq_allow_intr = 1;
482
483         /* Note: some dt index api might return non-zero result here, like
484          * osd_index_ea_lookup, so we should only check rc < 0 here */
485         rc = ptlrpc_queue_wait(req);
486         our->our_rc = rc;
487         if (rc < 0 || reqp == NULL)
488                 ptlrpc_req_finished(req);
489         else
490                 *reqp = req;
491
492         RETURN(rc);
493 }
494
495 /**
496  * Invalidate all objects in the osp thandle
497  *
498  * invalidate all of objects in the update request, which will be called
499  * when the transaction is aborted.
500  *
501  * \param[in] oth       osp thandle.
502  */
503 static void osp_thandle_invalidate_object(const struct lu_env *env,
504                                           struct osp_thandle *oth,
505                                           int result)
506 {
507         struct osp_update_request *our = oth->ot_our;
508         struct osp_object *obj;
509         struct osp_object *next;
510
511         if (our == NULL)
512                 return;
513
514         list_for_each_entry_safe(obj, next, &our->our_invalidate_cb_list,
515                                  opo_invalidate_cb_list) {
516                 if (result < 0)
517                         osp_invalidate(env, &obj->opo_obj);
518
519                 spin_lock(&obj->opo_lock);
520                 list_del_init(&obj->opo_invalidate_cb_list);
521                 spin_unlock(&obj->opo_lock);
522
523                 dt_object_put(env, &obj->opo_obj);
524         }
525 }
526
527 static void osp_trans_stop_cb(const struct lu_env *env,
528                               struct osp_thandle *oth, int result)
529 {
530         struct dt_txn_commit_cb *dcb;
531         struct dt_txn_commit_cb *tmp;
532
533         /* call per-transaction stop callbacks if any */
534         list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list,
535                                  dcb_linkage) {
536                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
537                          "commit callback entry: magic=%x name='%s'\n",
538                          dcb->dcb_magic, dcb->dcb_name);
539                 list_del_init(&dcb->dcb_linkage);
540                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
541         }
542
543         osp_thandle_invalidate_object(env, oth, result);
544 }
545
546 /**
547  * Allocate an osp request and initialize it with the given parameters.
548  *
549  * \param[in] obj               pointer to the operation target
550  * \param[in] data              pointer to the data used by the interpreter
551  * \param[in] interpreter       pointer to the interpreter function
552  *
553  * \retval                      pointer to the asychronous request
554  * \retval                      NULL if the allocation failed
555  */
556 static struct osp_update_callback *
557 osp_update_callback_init(struct osp_object *obj, void *data,
558                          osp_update_interpreter_t interpreter)
559 {
560         struct osp_update_callback *ouc;
561
562         OBD_ALLOC_PTR(ouc);
563         if (ouc == NULL)
564                 return NULL;
565
566         lu_object_get(osp2lu_obj(obj));
567         INIT_LIST_HEAD(&ouc->ouc_list);
568         ouc->ouc_obj = obj;
569         ouc->ouc_data = data;
570         ouc->ouc_interpreter = interpreter;
571
572         return ouc;
573 }
574
575 /**
576  * Destroy the osp_update_callback.
577  *
578  * \param[in] env       pointer to the thread context
579  * \param[in] ouc       pointer to osp_update_callback
580  */
581 static void osp_update_callback_fini(const struct lu_env *env,
582                                      struct osp_update_callback *ouc)
583 {
584         LASSERT(list_empty(&ouc->ouc_list));
585
586         lu_object_put(env, osp2lu_obj(ouc->ouc_obj));
587         OBD_FREE_PTR(ouc);
588 }
589
590 /**
591  * Interpret the packaged OUT RPC results.
592  *
593  * For every packaged sub-request, call its registered interpreter function.
594  * Then destroy the sub-request.
595  *
596  * \param[in] env       pointer to the thread context
597  * \param[in] req       pointer to the RPC
598  * \param[in] arg       pointer to data used by the interpreter
599  * \param[in] rc        the RPC return value
600  *
601  * \retval              0 for success
602  * \retval              negative error number on failure
603  */
604 static int osp_update_interpret(const struct lu_env *env,
605                                 struct ptlrpc_request *req, void *args, int rc)
606 {
607         struct object_update_reply *reply = NULL;
608         struct osp_update_args *oaua = args;
609         struct osp_update_request *our = oaua->oaua_update;
610         struct osp_thandle *oth;
611         struct osp_update_callback *ouc;
612         struct osp_update_callback *next;
613         int count = 0;
614         int index = 0;
615         int rc1 = 0;
616
617         ENTRY;
618
619         if (our == NULL)
620                 RETURN(0);
621
622         /* Sigh env might be NULL in some cases, see
623          * this calling path.
624          * osp_send_update_thread()
625          *  ptlrpc_set_wait() ----> null env.
626          *   ptlrpc_check_set()
627          *    osp_update_interpret()
628          * Let's use env in oaua for this case.
629          */
630         if (env == NULL)
631                 env = oaua->oaua_update_env;
632
633         oaua->oaua_update = NULL;
634         oth = our->our_th;
635         if (oaua->oaua_flow_control) {
636                 struct osp_device *osp;
637
638                 LASSERT(oth != NULL);
639                 osp = dt2osp_dev(oth->ot_super.th_dev);
640                 obd_put_request_slot(&osp->opd_obd->u.cli);
641         }
642
643         /* Unpack the results from the reply message. */
644         if (req->rq_repmsg != NULL && req->rq_replied) {
645                 reply = req_capsule_server_sized_get(&req->rq_pill,
646                                                      &RMF_OUT_UPDATE_REPLY,
647                                                      OUT_UPDATE_REPLY_SIZE);
648                 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) {
649                         if (rc == 0)
650                                 rc = -EPROTO;
651                 } else {
652                         count = reply->ourp_count;
653                 }
654         }
655
656         list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) {
657                 list_del_init(&ouc->ouc_list);
658
659                 /* The peer may only have handled some requests (indicated
660                  * by the 'count') in the packaged OUT RPC, we can only get
661                  * results for the handled part. */
662                 if (index < count && reply->ourp_lens[index] > 0 && rc >= 0) {
663                         struct object_update_result *result;
664
665                         result = object_update_result_get(reply, index, NULL);
666                         if (result == NULL)
667                                 rc1 = rc = -EPROTO;
668                         else
669                                 rc1 = rc = result->our_rc;
670                 } else if (rc1 >= 0) {
671                         /* The peer did not handle these request, let's return
672                          * -EINVAL to update interpret for now */
673                         if (rc >= 0)
674                                 rc1 = -EINVAL;
675                         else
676                                 rc1 = rc;
677                 }
678
679                 if (ouc->ouc_interpreter != NULL)
680                         ouc->ouc_interpreter(env, reply, req, ouc->ouc_obj,
681                                              ouc->ouc_data, index, rc1);
682
683                 osp_update_callback_fini(env, ouc);
684                 index++;
685         }
686
687         if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
688                 wake_up_all(oaua->oaua_waitq);
689
690         if (oth != NULL) {
691                 /* oth and osp_update_requests will be destoryed in
692                  * osp_thandle_put */
693                 osp_trans_stop_cb(env, oth, rc);
694                 osp_thandle_put(env, oth);
695         } else {
696                 osp_update_request_destroy(env, our);
697         }
698
699         RETURN(rc);
700 }
701
702 /**
703  * Pack all the requests in the shared asynchronous idempotent request queue
704  * into a single OUT RPC that will be given to the background ptlrpcd daemon.
705  *
706  * \param[in] env       pointer to the thread context
707  * \param[in] osp       pointer to the OSP device
708  * \param[in] our       pointer to the shared queue
709  *
710  * \retval              0 for success
711  * \retval              negative error number on failure
712  */
713 int osp_unplug_async_request(const struct lu_env *env,
714                              struct osp_device *osp,
715                              struct osp_update_request *our)
716 {
717         struct osp_update_args  *args;
718         struct ptlrpc_request   *req = NULL;
719         int                      rc;
720
721         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
722                                  our, &req);
723         if (rc != 0) {
724                 struct osp_update_callback *ouc;
725                 struct osp_update_callback *next;
726
727                 list_for_each_entry_safe(ouc, next,
728                                          &our->our_cb_items, ouc_list) {
729                         list_del_init(&ouc->ouc_list);
730                         if (ouc->ouc_interpreter != NULL)
731                                 ouc->ouc_interpreter(env, NULL, NULL,
732                                                      ouc->ouc_obj,
733                                                      ouc->ouc_data, 0, rc);
734                         osp_update_callback_fini(env, ouc);
735                 }
736                 osp_update_request_destroy(env, our);
737         } else {
738                 args = ptlrpc_req_async_args(args, req);
739                 args->oaua_update = our;
740                 args->oaua_count = NULL;
741                 args->oaua_waitq = NULL;
742                 /* Note: this is asynchronous call for the request, so the
743                  * interrupte cb and current function will be different
744                  * thread, so we need use different env */
745                 args->oaua_update_env = NULL;
746                 args->oaua_flow_control = false;
747                 req->rq_interpret_reply = osp_update_interpret;
748                 ptlrpcd_add_req(req);
749         }
750
751         return rc;
752 }
753
754 /**
755  * Find or create (if NOT exist or purged) the shared asynchronous idempotent
756  * request queue - osp_device::opd_async_requests.
757  *
758  * If the osp_device::opd_async_requests is not NULL, then return it directly;
759  * otherwise create new osp_update_request and attach it to opd_async_requests.
760  *
761  * \param[in] osp       pointer to the OSP device
762  *
763  * \retval              pointer to the shared queue
764  * \retval              negative error number on failure
765  */
766 static struct osp_update_request *
767 osp_find_or_create_async_update_request(struct osp_device *osp)
768 {
769         struct osp_update_request *our = osp->opd_async_requests;
770
771         if (our != NULL)
772                 return our;
773
774         our = osp_update_request_create(&osp->opd_dt_dev);
775         if (IS_ERR(our))
776                 return our;
777
778         osp->opd_async_requests = our;
779
780         return our;
781 }
782
783 /**
784  * Insert an osp_update_callback into the osp_update_request.
785  *
786  * Insert an osp_update_callback to the osp_update_request. Usually each update
787  * in the osp_update_request will have one correspondent callback, and these
788  * callbacks will be called in rq_interpret_reply.
789  *
790  * \param[in] env               pointer to the thread context
791  * \param[in] obj               pointer to the operation target object
792  * \param[in] data              pointer to the data used by the interpreter
793  * \param[in] interpreter       pointer to the interpreter function
794  *
795  * \retval                      0 for success
796  * \retval                      negative error number on failure
797  */
798 int osp_insert_update_callback(const struct lu_env *env,
799                                struct osp_update_request *our,
800                                struct osp_object *obj, void *data,
801                                osp_update_interpreter_t interpreter)
802 {
803         struct osp_update_callback  *ouc;
804
805         ouc = osp_update_callback_init(obj, data, interpreter);
806         if (ouc == NULL)
807                 RETURN(-ENOMEM);
808
809         list_add_tail(&ouc->ouc_list, &our->our_cb_items);
810
811         return 0;
812 }
813
814 /**
815  * Insert an asynchronous idempotent request to the shared request queue that
816  * is attached to the osp_device.
817  *
818  * This function generates a new osp_async_request with the given parameters,
819  * then tries to insert the request into the osp_device-based shared request
820  * queue. If the queue is full, then triggers the packaged OUT RPC to purge
821  * the shared queue firstly, and then re-tries.
822  *
823  * NOTE: must hold the osp::opd_async_requests_mutex to serialize concurrent
824  *       osp_insert_async_request call from others.
825  *
826  * \param[in] env               pointer to the thread context
827  * \param[in] op                operation type, see 'enum update_type'
828  * \param[in] obj               pointer to the operation target
829  * \param[in] count             array size of the subsequent \a lens and \a bufs
830  * \param[in] lens              buffer length array for the subsequent \a bufs
831  * \param[in] bufs              the buffers to compose the request
832  * \param[in] data              pointer to the data used by the interpreter
833  * \param[in] repsize           how many bytes the caller allocated for \a data
834  * \param[in] interpreter       pointer to the interpreter function
835  *
836  * \retval                      0 for success
837  * \retval                      negative error number on failure
838  */
839 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
840                              struct osp_object *obj, int count,
841                              __u16 *lens, const void **bufs,
842                              void *data, __u32 repsize,
843                              osp_update_interpreter_t interpreter)
844 {
845         struct osp_device               *osp;
846         struct osp_update_request       *our;
847         struct object_update            *object_update;
848         size_t                          max_update_size;
849         struct object_update_request    *ureq;
850         struct osp_update_request_sub   *ours;
851         int                             rc = 0;
852         ENTRY;
853
854         osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
855         our = osp_find_or_create_async_update_request(osp);
856         if (IS_ERR(our))
857                 RETURN(PTR_ERR(our));
858
859 again:
860         ours = osp_current_object_update_request(our);
861
862         ureq = ours->ours_req;
863         max_update_size = ours->ours_req_size -
864                           object_update_request_size(ureq);
865
866         object_update = update_buffer_get_update(ureq, ureq->ourq_count);
867         rc = out_update_pack(env, object_update, &max_update_size, op,
868                              lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
869                              repsize);
870         /* The queue is full. */
871         if (rc == -E2BIG) {
872                 osp->opd_async_requests = NULL;
873                 mutex_unlock(&osp->opd_async_requests_mutex);
874
875                 rc = osp_unplug_async_request(env, osp, our);
876                 mutex_lock(&osp->opd_async_requests_mutex);
877                 if (rc != 0)
878                         RETURN(rc);
879
880                 our = osp_find_or_create_async_update_request(osp);
881                 if (IS_ERR(our))
882                         RETURN(PTR_ERR(our));
883
884                 goto again;
885         } else {
886                 if (rc < 0)
887                         RETURN(rc);
888
889                 ureq->ourq_count++;
890                 our->our_update_nr++;
891         }
892
893         rc = osp_insert_update_callback(env, our, obj, data, interpreter);
894
895         RETURN(rc);
896 }
897
898 int osp_trans_update_request_create(struct thandle *th)
899 {
900         struct osp_thandle              *oth = thandle_to_osp_thandle(th);
901         struct osp_update_request       *our;
902
903         if (oth->ot_our != NULL)
904                 return 0;
905
906         our = osp_update_request_create(th->th_dev);
907         if (IS_ERR(our)) {
908                 th->th_result = PTR_ERR(our);
909                 return PTR_ERR(our);
910         }
911
912         oth->ot_our = our;
913         our->our_th = oth;
914
915         return 0;
916 }
917
918 void osp_thandle_destroy(const struct lu_env *env,
919                          struct osp_thandle *oth)
920 {
921         LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
922         LASSERT(list_empty(&oth->ot_commit_dcb_list));
923         LASSERT(list_empty(&oth->ot_stop_dcb_list));
924         if (oth->ot_our != NULL)
925                 osp_update_request_destroy(env, oth->ot_our);
926         OBD_FREE_PTR(oth);
927 }
928
929 /**
930  * The OSP layer dt_device_operations::dt_trans_create() interface
931  * to create a transaction.
932  *
933  * There are two kinds of transactions that will involve OSP:
934  *
935  * 1) If the transaction only contains the updates on remote server
936  *    (MDT or OST), such as re-generating the lost OST-object for
937  *    LFSCK, then it is a remote transaction. For remote transaction,
938  *    the upper layer caller (such as the LFSCK engine) will call the
939  *    dt_trans_create() (with the OSP dt_device as the parameter),
940  *    then the call will be directed to the osp_trans_create() that
941  *    creates the transaction handler and returns it to the caller.
942  *
943  * 2) If the transcation contains both local and remote updates,
944  *    such as cross MDTs create under DNE mode, then the upper layer
945  *    caller will not trigger osp_trans_create(). Instead, it will
946  *    call dt_trans_create() on other dt_device, such as LOD that
947  *    will generate the transaction handler. Such handler will be
948  *    used by the whole transaction in subsequent sub-operations.
949  *
950  * \param[in] env       pointer to the thread context
951  * \param[in] d         pointer to the OSP dt_device
952  *
953  * \retval              pointer to the transaction handler
954  * \retval              negative error number on failure
955  */
956 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
957 {
958         struct osp_thandle              *oth;
959         struct thandle                  *th = NULL;
960         ENTRY;
961
962         OBD_ALLOC_PTR(oth);
963         if (unlikely(oth == NULL))
964                 RETURN(ERR_PTR(-ENOMEM));
965
966         oth->ot_magic = OSP_THANDLE_MAGIC;
967         th = &oth->ot_super;
968         th->th_dev = d;
969
970         atomic_set(&oth->ot_refcount, 1);
971         INIT_LIST_HEAD(&oth->ot_commit_dcb_list);
972         INIT_LIST_HEAD(&oth->ot_stop_dcb_list);
973
974         RETURN(th);
975 }
976
977 /**
978  * Add commit callback to transaction.
979  *
980  * Add commit callback to the osp thandle, which will be called
981  * when the thandle is committed remotely.
982  *
983  * \param[in] th        the thandle
984  * \param[in] dcb       commit callback structure
985  *
986  * \retval              only return 0 for now.
987  */
988 int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
989 {
990         struct osp_thandle *oth = thandle_to_osp_thandle(th);
991
992         LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
993         LASSERT(&dcb->dcb_func != NULL);
994         if (dcb->dcb_flags & DCB_TRANS_STOP)
995                 list_add(&dcb->dcb_linkage, &oth->ot_stop_dcb_list);
996         else
997                 list_add(&dcb->dcb_linkage, &oth->ot_commit_dcb_list);
998         return 0;
999 }
1000
1001 static void osp_trans_commit_cb(struct osp_thandle *oth, int result)
1002 {
1003         struct dt_txn_commit_cb *dcb;
1004         struct dt_txn_commit_cb *tmp;
1005
1006         LASSERT(atomic_read(&oth->ot_refcount) > 0);
1007         /* call per-transaction callbacks if any */
1008         list_for_each_entry_safe(dcb, tmp, &oth->ot_commit_dcb_list,
1009                                  dcb_linkage) {
1010                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
1011                          "commit callback entry: magic=%x name='%s'\n",
1012                          dcb->dcb_magic, dcb->dcb_name);
1013                 list_del_init(&dcb->dcb_linkage);
1014                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
1015         }
1016 }
1017
1018 static void osp_request_commit_cb(struct ptlrpc_request *req)
1019 {
1020         struct thandle          *th = req->rq_cb_data;
1021         struct osp_thandle      *oth;
1022         __u64                   last_committed_transno = 0;
1023         int                     result = req->rq_status;
1024         ENTRY;
1025
1026         if (th == NULL)
1027                 RETURN_EXIT;
1028
1029         oth = thandle_to_osp_thandle(th);
1030         if (req->rq_repmsg != NULL &&
1031             lustre_msg_get_last_committed(req->rq_repmsg))
1032                 last_committed_transno =
1033                         lustre_msg_get_last_committed(req->rq_repmsg);
1034
1035         if (last_committed_transno <
1036                 req->rq_import->imp_peer_committed_transno)
1037                 last_committed_transno =
1038                         req->rq_import->imp_peer_committed_transno;
1039
1040         CDEBUG(D_HA, "trans no %llu committed transno %llu\n",
1041                req->rq_transno, last_committed_transno);
1042
1043         /* If the transaction is not really committed, mark result = 1 */
1044         if (req->rq_transno != 0 &&
1045             (req->rq_transno > last_committed_transno) && result == 0)
1046                 result = 1;
1047
1048         osp_trans_commit_cb(oth, result);
1049         req->rq_committed = 1;
1050         osp_thandle_put(NULL, oth);
1051         EXIT;
1052 }
1053
1054 /**
1055  * callback of osp transaction
1056  *
1057  * Call all of callbacks for this osp thandle. This will only be
1058  * called in error handler path. In the normal processing path,
1059  * these callback will be called in osp_request_commit_cb() and
1060  * osp_update_interpret().
1061  *
1062  * \param [in] env      execution environment
1063  * \param [in] oth      osp thandle
1064  * \param [in] rc       result of the osp thandle
1065  */
1066 void osp_trans_callback(const struct lu_env *env,
1067                         struct osp_thandle *oth, int rc)
1068 {
1069         struct osp_update_callback *ouc;
1070         struct osp_update_callback *next;
1071
1072         if (oth->ot_our != NULL) {
1073                 list_for_each_entry_safe(ouc, next,
1074                                          &oth->ot_our->our_cb_items, ouc_list) {
1075                         list_del_init(&ouc->ouc_list);
1076                         if (ouc->ouc_interpreter != NULL)
1077                                 ouc->ouc_interpreter(env, NULL, NULL,
1078                                                      ouc->ouc_obj,
1079                                                      ouc->ouc_data, 0, rc);
1080                         osp_update_callback_fini(env, ouc);
1081                 }
1082         }
1083         osp_trans_stop_cb(env, oth, rc);
1084         osp_trans_commit_cb(oth, rc);
1085 }
1086
1087 /**
1088  * Send the request for remote updates.
1089  *
1090  * Send updates to the remote MDT. Prepare the request by osp_update_req
1091  * and send them to remote MDT, for sync request, it will wait
1092  * until the reply return, otherwise hand it to ptlrpcd.
1093  *
1094  * Please refer to osp_trans_create() for transaction type.
1095  *
1096  * \param[in] env               pointer to the thread context
1097  * \param[in] osp               pointer to the OSP device
1098  * \param[in] our               pointer to the osp_update_request
1099  *
1100  * \retval                      0 for success
1101  * \retval                      negative error number on failure
1102  */
1103 static int osp_send_update_req(const struct lu_env *env,
1104                                struct osp_device *osp,
1105                                struct osp_update_request *our)
1106 {
1107         struct osp_update_args  *args;
1108         struct ptlrpc_request   *req;
1109         struct osp_thandle      *oth = our->our_th;
1110         int     rc = 0;
1111         ENTRY;
1112
1113         LASSERT(oth != NULL);
1114         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
1115                                  our, &req);
1116         if (rc != 0) {
1117                 osp_trans_callback(env, oth, rc);
1118                 RETURN(rc);
1119         }
1120
1121         args = ptlrpc_req_async_args(args, req);
1122         args->oaua_update = our;
1123         /* set env to NULL, in case the interrupt cb and current function
1124          * are in different thread */
1125         args->oaua_update_env = NULL;
1126         osp_thandle_get(oth); /* hold for update interpret */
1127         req->rq_interpret_reply = osp_update_interpret;
1128         if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) {
1129                 if (!osp->opd_imp_active || !osp->opd_imp_connected) {
1130                         osp_trans_callback(env, oth, rc);
1131                         osp_thandle_put(env, oth);
1132                         GOTO(out, rc = -ENOTCONN);
1133                 }
1134
1135                 rc = obd_get_request_slot(&osp->opd_obd->u.cli);
1136                 if (rc != 0) {
1137                         osp_trans_callback(env, oth, rc);
1138                         osp_thandle_put(env, oth);
1139                         GOTO(out, rc = -ENOTCONN);
1140                 }
1141                 args->oaua_flow_control = true;
1142
1143                 if (!osp->opd_connect_mdt) {
1144                         down_read(&osp->opd_async_updates_rwsem);
1145                         args->oaua_count = &osp->opd_async_updates_count;
1146                         args->oaua_waitq = &osp->opd_sync_barrier_waitq;
1147                         up_read(&osp->opd_async_updates_rwsem);
1148                         atomic_inc(args->oaua_count);
1149                 }
1150
1151                 ptlrpcd_add_req(req);
1152                 req = NULL;
1153         } else {
1154                 osp_thandle_get(oth); /* hold for commit callback */
1155                 req->rq_commit_cb = osp_request_commit_cb;
1156                 req->rq_cb_data = &oth->ot_super;
1157                 args->oaua_flow_control = false;
1158
1159                 /* If the transaction is created during MDT recoverying
1160                  * process, it means this is an recovery update, we need
1161                  * to let OSP send it anyway without checking recoverying
1162                  * status, in case the other target is being recoveried
1163                  * at the same time, and if we wait here for the import
1164                  * to be recoveryed, it might cause deadlock */
1165                 osp_set_req_replay(osp, req);
1166
1167                 /* Because this req will be synchronus, i.e. it will be called
1168                  * in the same thread, so it will be safe to use current
1169                  * env */
1170                 args->oaua_update_env = env;
1171                 if (osp->opd_connect_mdt)
1172                         osp_get_rpc_lock(osp);
1173                 rc = ptlrpc_queue_wait(req);
1174                 if (osp->opd_connect_mdt)
1175                         osp_put_rpc_lock(osp);
1176
1177                 /* We use rq_queued_time to distinguish between local
1178                  * and remote -ENOMEM. */
1179                 if ((rc == -ENOMEM && req->rq_queued_time == 0) ||
1180                     (req->rq_transno == 0 && !req->rq_committed)) {
1181                         if (args->oaua_update != NULL) {
1182                                 /* If osp_update_interpret is not being called,
1183                                  * release the osp_thandle */
1184                                 args->oaua_update = NULL;
1185                                 osp_thandle_put(env, oth);
1186                         }
1187
1188                         req->rq_cb_data = NULL;
1189                         rc = rc == 0 ? req->rq_status : rc;
1190                         osp_trans_callback(env, oth, rc);
1191                         osp_thandle_put(env, oth);
1192                         GOTO(out, rc);
1193                 }
1194         }
1195 out:
1196         if (req != NULL)
1197                 ptlrpc_req_finished(req);
1198
1199         RETURN(rc);
1200 }
1201
1202 /**
1203  * Get local thandle for osp_thandle
1204  *
1205  * Get the local OSD thandle from the OSP thandle. Currently, there
1206  * are a few OSP API (osp_create() and osp_sync_add()) needs
1207  * to update the object on local OSD device.
1208  *
1209  * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then
1210  * we will get local thandle by thandle_get_sub_by_dt.
1211  *
1212  * If the osp_thandle is remote thandle (th_top == NULL, only used
1213  * by LFSCK), then it will create a local thandle, and stop it in
1214  * osp_trans_stop(). And this only happens on OSP for OST.
1215  *
1216  * These are temporary solution, once OSP accessing OSD object is
1217  * being fixed properly, this function should be removed. XXX
1218  *
1219  * \param[in] env               pointer to the thread context
1220  * \param[in] th                pointer to the transaction handler
1221  * \param[in] dt                pointer to the OSP device
1222  *
1223  * \retval                      pointer to the local thandle
1224  * \retval                      ERR_PTR(errno) if it fails.
1225  **/
1226 struct thandle *osp_get_storage_thandle(const struct lu_env *env,
1227                                         struct thandle *th,
1228                                         struct osp_device *osp)
1229 {
1230         struct osp_thandle      *oth;
1231         struct thandle          *local_th;
1232
1233         if (th->th_top != NULL)
1234                 return thandle_get_sub_by_dt(env, th->th_top,
1235                                              osp->opd_storage);
1236
1237         LASSERT(!osp->opd_connect_mdt);
1238         oth = thandle_to_osp_thandle(th);
1239         if (oth->ot_storage_th != NULL)
1240                 return oth->ot_storage_th;
1241
1242         local_th = dt_trans_create(env, osp->opd_storage);
1243         if (IS_ERR(local_th))
1244                 return local_th;
1245
1246         oth->ot_storage_th = local_th;
1247
1248         return local_th;
1249 }
1250
1251 /**
1252  * Set version for the transaction
1253  *
1254  * Set the version for the transaction and add the request to
1255  * the sending list, then after transaction stop, the request
1256  * will be sent in the order of version by the sending thread.
1257  *
1258  * \param [in] oth      osp thandle to be set version.
1259  *
1260  * \retval              0 if set version succeeds
1261  *                      negative errno if set version fails.
1262  */
1263 int osp_check_and_set_rpc_version(struct osp_thandle *oth,
1264                                   struct osp_object *obj)
1265 {
1266         struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
1267         struct osp_updates *ou = osp->opd_update;
1268
1269         if (ou == NULL)
1270                 return -EIO;
1271
1272         if (oth->ot_our->our_version != 0)
1273                 return 0;
1274
1275         spin_lock(&ou->ou_lock);
1276         spin_lock(&oth->ot_our->our_list_lock);
1277         if (obj->opo_stale) {
1278                 spin_unlock(&oth->ot_our->our_list_lock);
1279                 spin_unlock(&ou->ou_lock);
1280                 return -ESTALE;
1281         }
1282
1283         /* Assign the version and add it to the sending list */
1284         osp_thandle_get(oth);
1285         oth->ot_our->our_version = ou->ou_version++;
1286         oth->ot_our->our_generation = ou->ou_generation;
1287         list_add_tail(&oth->ot_our->our_list,
1288                       &osp->opd_update->ou_list);
1289         oth->ot_our->our_req_ready = 0;
1290         spin_unlock(&oth->ot_our->our_list_lock);
1291         spin_unlock(&ou->ou_lock);
1292
1293         LASSERT(oth->ot_super.th_wait_submit == 1);
1294         CDEBUG(D_INFO, "%s: version %llu gen %llu oth:version %p:%llu\n",
1295                osp->opd_obd->obd_name, ou->ou_version, ou->ou_generation, oth,
1296                oth->ot_our->our_version);
1297
1298         return 0;
1299 }
1300
1301 /**
1302  * Get next OSP update request in the sending list
1303  * Get next OSP update request in the sending list by version number, next
1304  * request will be
1305  * 1. transaction which does not have a version number.
1306  * 2. transaction whose version == opd_rpc_version.
1307  *
1308  * \param [in] ou       osp update structure.
1309  * \param [out] ourp    the pointer holding the next update request.
1310  *
1311  * \retval              true if getting the next transaction.
1312  * \retval              false if not getting the next transaction.
1313  */
1314 static bool
1315 osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp)
1316 {
1317         struct osp_update_request *our;
1318         struct osp_update_request *tmp;
1319         bool                    got_req = false;
1320
1321         spin_lock(&ou->ou_lock);
1322         list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
1323                 LASSERT(our->our_th != NULL);
1324                 CDEBUG(D_HA, "ou %p version %llu rpc_version %llu\n",
1325                        ou, our->our_version, ou->ou_rpc_version);
1326                 spin_lock(&our->our_list_lock);
1327                 /* Find next osp_update_request in the list */
1328                 if (our->our_version == ou->ou_rpc_version &&
1329                     our->our_req_ready) {
1330                         list_del_init(&our->our_list);
1331                         spin_unlock(&our->our_list_lock);
1332                         *ourp = our;
1333                         got_req = true;
1334                         break;
1335                 }
1336                 spin_unlock(&our->our_list_lock);
1337         }
1338         spin_unlock(&ou->ou_lock);
1339
1340         return got_req;
1341 }
1342
1343 /**
1344  * Invalidate update request
1345  *
1346  * Invalidate update request in the OSP sending list, so all of
1347  * requests in the sending list will return error, which happens
1348  * when it finds one update (with writing llog) requests fails or
1349  * the OSP is evicted by remote target. see osp_send_update_thread().
1350  *
1351  * \param[in] osp       OSP device whose update requests will be
1352  *                      invalidated.
1353  **/
1354 void osp_invalidate_request(struct osp_device *osp)
1355 {
1356         struct lu_env env;
1357         struct osp_updates *ou = osp->opd_update;
1358         struct osp_update_request *our;
1359         struct osp_update_request *tmp;
1360         LIST_HEAD(list);
1361         int                     rc;
1362         ENTRY;
1363
1364         if (ou == NULL)
1365                 return;
1366
1367         rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
1368         if (rc < 0) {
1369                 CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
1370                        rc);
1371
1372                 spin_lock(&ou->ou_lock);
1373                 ou->ou_generation++;
1374                 spin_unlock(&ou->ou_lock);
1375
1376                 return;
1377         }
1378
1379         spin_lock(&ou->ou_lock);
1380         /* invalidate all of request in the sending list */
1381         list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
1382                 spin_lock(&our->our_list_lock);
1383                 if (our->our_req_ready)
1384                         list_move(&our->our_list, &list);
1385                 else
1386                         list_del_init(&our->our_list);
1387
1388                 if (our->our_th->ot_super.th_result == 0)
1389                         our->our_th->ot_super.th_result = -EIO;
1390
1391                 if (our->our_version >= ou->ou_rpc_version)
1392                         ou->ou_rpc_version = our->our_version + 1;
1393                 spin_unlock(&our->our_list_lock);
1394
1395                 CDEBUG(D_HA, "%s invalidate our %p\n", osp->opd_obd->obd_name,
1396                        our);
1397         }
1398
1399         /* Increase the generation, then the update request with old generation
1400          * will fail with -EIO. */
1401         ou->ou_generation++;
1402         spin_unlock(&ou->ou_lock);
1403
1404         /* invalidate all of request in the sending list */
1405         list_for_each_entry_safe(our, tmp, &list, our_list) {
1406                 spin_lock(&our->our_list_lock);
1407                 list_del_init(&our->our_list);
1408                 spin_unlock(&our->our_list_lock);
1409                 osp_trans_callback(&env, our->our_th,
1410                                    our->our_th->ot_super.th_result);
1411                 osp_thandle_put(&env, our->our_th);
1412         }
1413         lu_env_fini(&env);
1414 }
1415
1416 /**
1417  * Sending update thread
1418  *
1419  * Create thread to send update request to other MDTs, this thread will pull
1420  * out update request from the list in OSP by version number, i.e. it will
1421  * make sure the update request with lower version number will be sent first.
1422  *
1423  * \param[in] arg       hold the OSP device.
1424  *
1425  * \retval              0 if the thread is created successfully.
1426  * \retal               negative error if the thread is not created
1427  *                      successfully.
1428  */
1429 int osp_send_update_thread(void *arg)
1430 {
1431         struct lu_env           env;
1432         struct osp_device       *osp = arg;
1433         struct osp_updates      *ou = osp->opd_update;
1434         struct ptlrpc_thread    *thread = &osp->opd_update_thread;
1435         struct osp_update_request *our = NULL;
1436         int                     rc;
1437         ENTRY;
1438
1439         LASSERT(ou != NULL);
1440         rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
1441         if (rc < 0) {
1442                 CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
1443                        rc);
1444                 RETURN(rc);
1445         }
1446
1447         thread->t_flags = SVC_RUNNING;
1448         wake_up(&thread->t_ctl_waitq);
1449         while (1) {
1450                 our = NULL;
1451                 wait_event_idle(ou->ou_waitq,
1452                                 !osp_send_update_thread_running(osp) ||
1453                                 osp_get_next_request(ou, &our));
1454
1455                 if (!osp_send_update_thread_running(osp)) {
1456                         if (our != NULL) {
1457                                 osp_trans_callback(&env, our->our_th, -EINTR);
1458                                 osp_thandle_put(&env, our->our_th);
1459                         }
1460                         break;
1461                 }
1462
1463                 LASSERT(our->our_th != NULL);
1464                 if (our->our_th->ot_super.th_result != 0) {
1465                         osp_trans_callback(&env, our->our_th,
1466                                 our->our_th->ot_super.th_result);
1467                         rc = our->our_th->ot_super.th_result;
1468                 } else if (ou->ou_generation != our->our_generation ||
1469                            OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) {
1470                         rc = -EIO;
1471                         osp_trans_callback(&env, our->our_th, rc);
1472                 } else {
1473                         rc = osp_send_update_req(&env, osp, our);
1474                 }
1475
1476                 /* Update the rpc version */
1477                 spin_lock(&ou->ou_lock);
1478                 if (our->our_version == ou->ou_rpc_version)
1479                         ou->ou_rpc_version++;
1480                 spin_unlock(&ou->ou_lock);
1481
1482                 /* If one update request fails, let's fail all of the requests
1483                  * in the sending list, because the request in the sending
1484                  * list are dependent on either other, continue sending these
1485                  * request might cause llog or filesystem corruption */
1486                 if (rc < 0)
1487                         osp_invalidate_request(osp);
1488
1489                 /* Balanced for thandle_get in osp_check_and_set_rpc_version */
1490                 osp_thandle_put(&env, our->our_th);
1491         }
1492
1493         thread->t_flags = SVC_STOPPED;
1494         lu_env_fini(&env);
1495         wake_up(&thread->t_ctl_waitq);
1496
1497         RETURN(0);
1498 }
1499
1500 /**
1501  * The OSP layer dt_device_operations::dt_trans_start() interface
1502  * to start the transaction.
1503  *
1504  * If the transaction is a remote transaction, then related remote
1505  * updates will be triggered in the osp_trans_stop().
1506  * Please refer to osp_trans_create() for transaction type.
1507  *
1508  * \param[in] env               pointer to the thread context
1509  * \param[in] dt                pointer to the OSP dt_device
1510  * \param[in] th                pointer to the transaction handler
1511  *
1512  * \retval                      0 for success
1513  * \retval                      negative error number on failure
1514  */
1515 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
1516                     struct thandle *th)
1517 {
1518         struct osp_thandle      *oth = thandle_to_osp_thandle(th);
1519
1520         if (oth->ot_super.th_sync)
1521                 oth->ot_our->our_flags |= UPDATE_FL_SYNC;
1522         /* For remote thandle, if there are local thandle, start it here*/
1523         if (is_only_remote_trans(th) && oth->ot_storage_th != NULL)
1524                 return dt_trans_start(env, oth->ot_storage_th->th_dev,
1525                                       oth->ot_storage_th);
1526         return 0;
1527 }
1528
1529 /**
1530  * The OSP layer dt_device_operations::dt_trans_stop() interface
1531  * to stop the transaction.
1532  *
1533  * If the transaction is a remote transaction, related remote
1534  * updates will be triggered at the end of this function.
1535  *
1536  * For synchronous mode update or any failed update, the request
1537  * will be destroyed explicitly when the osp_trans_stop().
1538  *
1539  * Please refer to osp_trans_create() for transaction type.
1540  *
1541  * \param[in] env               pointer to the thread context
1542  * \param[in] dt                pointer to the OSP dt_device
1543  * \param[in] th                pointer to the transaction handler
1544  *
1545  * \retval                      0 for success
1546  * \retval                      negative error number on failure
1547  */
1548 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
1549                    struct thandle *th)
1550 {
1551         struct osp_thandle       *oth = thandle_to_osp_thandle(th);
1552         struct osp_update_request *our = oth->ot_our;
1553         struct osp_device        *osp = dt2osp_dev(dt);
1554         int                      rc = 0;
1555         ENTRY;
1556
1557         /* For remote transaction, if there is local storage thandle,
1558          * stop it first */
1559         if (oth->ot_storage_th != NULL && th->th_top == NULL) {
1560                 dt_trans_stop(env, oth->ot_storage_th->th_dev,
1561                               oth->ot_storage_th);
1562                 oth->ot_storage_th = NULL;
1563         }
1564
1565         if (our == NULL || list_empty(&our->our_req_list)) {
1566                 osp_trans_callback(env, oth, th->th_result);
1567                 GOTO(out, rc = th->th_result);
1568         }
1569
1570         if (!osp->opd_connect_mdt) {
1571                 osp_trans_callback(env, oth, th->th_result);
1572                 rc = osp_send_update_req(env, osp, oth->ot_our);
1573                 GOTO(out, rc);
1574         }
1575
1576         if (osp->opd_update == NULL ||
1577             !osp_send_update_thread_running(osp)) {
1578                 osp_trans_callback(env, oth, -EIO);
1579                 GOTO(out, rc = -EIO);
1580         }
1581
1582         CDEBUG(D_HA, "%s: add oth %p with version %llu\n",
1583                osp->opd_obd->obd_name, oth, our->our_version);
1584
1585         LASSERT(our->our_req_ready == 0);
1586         spin_lock(&our->our_list_lock);
1587         if (likely(!list_empty(&our->our_list))) {
1588                 /* notify sending thread */
1589                 our->our_req_ready = 1;
1590                 wake_up(&osp->opd_update->ou_waitq);
1591                 spin_unlock(&our->our_list_lock);
1592         } else if (th->th_result == 0) {
1593                 /* if the request does not needs to be serialized,
1594                  * read-only request etc, let's send it right away */
1595                 spin_unlock(&our->our_list_lock);
1596                 rc = osp_send_update_req(env, osp, our);
1597         } else {
1598                 spin_unlock(&our->our_list_lock);
1599                 osp_trans_callback(env, oth, th->th_result);
1600         }
1601 out:
1602         osp_thandle_put(env, oth);
1603
1604         RETURN(rc);
1605 }