WHY STUDENT PREFER US?  
4.9/5

5 Star Rating

93940

Orders Deliver

3949

PhD Experts

24x7

Support

100%

Privacy

100%

Top Quality

Sample Details

Computer Science Histogram Algorithm Using Mpi and Openmpi

Number Of View : 232

Download : 0

Pages: 9

Words : 2197

Question :

 

This is coding assignment. Need to implement 3 functions for Parallel sorting using Histogram algoritham using MPI and OpenMPI and c++.

 

Answer : 

 

#include "solution.h"

 

#include

#include

#include

#include

#include

#include

 

#include "basic_defs.h"

#include "databasics.h"

 

MPI_Datatype dtype = MPI_UNSIGNED_LONG_LONG;

 

// Get midpoint of two numbers

uint64_t getMid(const uint64_t &a, const uint64_t &b) {

    uint64_t ans = a / 2 + b / 2;

    if (((a % 2) + (b % 2)) == 2) ans++;

    return ans;

}

 

// Find the number of items in sorted data which are less than equal to key

uint64_t getHist(const dist_sort_t *data, const dist_sort_size_t n,

                 const dist_sort_t key) {

    uint64_t a, b, mid;

    a = 0;

    b = n;

    while (b > a) {

        mid = getMid(a, b);

        if (key == data[mid])

            break;

        else if (key < data[mid])

            b = mid;

        else

            a = mid + 1;

    }

    return mid + 1;

}

 

// Check if the hist is valid splitter, if it is then return the splitter index

uint64_t idealSplitter(dist_sort_size_t dataPerProc, dist_sort_t hist) {

    uint64_t thresh = dataPerProc / 200;

    uint64_t offset = hist % dataPerProc;

    // printf("hist: %lu, offset: %lu, threash: %lu\n", hist, offset, thresh);

    if (offset < thresh) {

        return (int)(hist / dataPerProc) - 1;

    } else if ((dataPerProc - offset) < thresh) {

        return hist / dataPerProc;

    } else

        return -1;

}

 

void rebalance(const dist_sort_t *data, const dist_sort_size_t myDataCount,

               dist_sort_t **rebalancedData, dist_sort_size_t *rCount) {

    // Get number of processes

    int nProcs;

    MPI_Comm_size(MPI_COMM_WORLD, &nProcs);

 

    // Get rank of the process

    int rank;

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

 

    dist_sort_size_t global_N;

    // Perform MPI all reduce to sum up all local_N's and get global_N

    MPI_Allreduce(&myDataCount, &global_N, 1, dtype, MPI_SUM, MPI_COMM_WORLD);

 

    uint64_t dataPerProc, limit;

    // Datacount per process

    dataPerProc = global_N / nProcs;

    // no of processes with extra data

    limit = global_N % nProcs;

    // if (rank == 0)

    //     printf("limit %lu, dataperproc %lu, global %lu\n", limit, dataPerProc,

    //            global_N);

 

    // assign datasize for curr process

    dist_sort_size_t myCount = dataPerProc + (rank < limit ? 1 : 0);

    // allocate array for output

    dist_sort_t *balanced =

        (dist_sort_t *)malloc(myCount * sizeof(dist_sort_t));

 

    // Global starting and ending index of this rank

    uint64_t myStartGlobal, myEnd;

    MPI_Exscan(&myDataCount, &myStartGlobal, 1, dtype, MPI_SUM, MPI_COMM_WORLD);

    if (rank == 0) myStartGlobal = 0;

    myEnd = myStartGlobal + myDataCount - 1;

 

    MPI_Win win;

    // create window for one way communication

    MPI_Win_create(balanced, myCount * sizeof(dist_sort_t), sizeof(dist_sort_t),

                   MPI_INFO_NULL, MPI_COMM_WORLD, &win);

 

    MPI_Win_fence(MPI_MODE_NOPRECEDE, win);

 

    uint64_t next = myStartGlobal;

    while (next <= myEnd) {

        uint64_t destProcs = dataPerProc + 1;  // data count in destination rank

        uint64_t dest = next / destProcs;      // destination rank

        uint64_t disp;                         // offset in destination rank

        uint64_t size;  // size to write to destination rank

        if (!(dest < limit)) {

            dest = (next - limit) / --destProcs;

            disp = (next - limit) % destProcs;

            size = std::min(destProcs * (dest + 1), myEnd + 1) - (next - limit);

        } else {

            disp = next % destProcs;

            size = std::min(destProcs * (dest + 1), myEnd + 1) - next;

        }

        // printf("\ndest %lu, next %lu\n", dest, next);

        // writing to destination rank

        MPI_Put(&data[next - myStartGlobal], size, dtype, dest, disp, size,

                dtype, win);

        next += size;

    }

    MPI_Win_fence(0, win);

    MPI_Win_fence(MPI_MODE_NOSUCCEED, win);

 

    // assigning rebalanced data

    *rebalancedData = balanced;

    *rCount = myCount;

}

 

void findSplitters(const dist_sort_t *data, const dist_sort_size_t data_size,

                   dist_sort_t *splitters, dist_sort_size_t *counts,

                   int numSplitters) {

    // Get number of processes

    int nProcs;

    MPI_Comm_size(MPI_COMM_WORLD, &nProcs);

 

    // Get rank of the process

    int rank;

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

 

    dist_sort_size_t global_N;

    // Perform MPI all reduce to sum up all local_N's and get global_N

    MPI_Allreduce(&data_size, &global_N, 1, dtype, MPI_SUM, MPI_COMM_WORLD);

 

    uint64_t dataPerProc;

    // Datacount per process

    dataPerProc = global_N / nProcs;

 

    // number of probes to use

    uint64_t k = 1000 * numSplitters;

    // number of unsatisfied splitters

    uint64_t splittersLeft = numSplitters;

 

    dist_sort_size_t prefixCounts[numSplitters];

 

    // assigning final splitter

    splitters[numSplitters - 1] = DIST_SORT_MAX;

    prefixCounts[numSplitters - 1] = global_N;

    // indicates satisfied splitters

    bool selected[numSplitters];

    splittersLeft--;

 

#pragma omp parallel for

    for (int i = 0; i < numSplitters - 1; i++) {

        selected[i] = false;

    }

    selected[numSplitters - 1] = true;

    // keys for probes

    dist_sort_t probes[k];

    // local histogram and global histogram

    dist_sort_t localHist[k], globalHist[k];

    if (0 == rank)

    // initialise probes

#pragma omp parallel for

        for (int i = 0; i < k; i++)

            probes[i] = (DIST_SORT_MAX / (k + 1)) * (i + 1);

 

    while (splittersLeft) {

        // printf("%ld ", splittersLeft);

        MPI_Barrier(MPI_COMM_WORLD);

        // send selected probes to all ranks

        MPI_Bcast(probes, k, dtype, 0, MPI_COMM_WORLD);

 

#pragma omp parallel for

        for (int i = 0; i < k; i++) {

            // calculate histogram for all probes

            localHist[i] = getHist(data, data_size, probes[i]);

            // printf("rank: %d, localhist: %lu\n", rank, localHist[i]);

        }

        MPI_Barrier(MPI_COMM_WORLD);

        // calculate global histogram for all probes

        MPI_Reduce(&localHist, &globalHist, k, dtype, MPI_SUM, 0,

                   MPI_COMM_WORLD);

 

        // int x;

        // std::cin >> x;

        if (0 == rank) {

#pragma omp parallel for

            for (int i = 0; i < k; i++) {

                int64_t idx = idealSplitter(dataPerProc, globalHist[i]);

                // std::cout << idx << std::endl;

                // std::cout << (idx > -1) << std::endl;

                // if (idx > -1) std::cout << splitters[idx] << std::endl;

                // std::cin >> x;

 

                // for each probe check if it is satisfied splitter

                if (idx > -1 && !selected[idx]) {

                    // printf("selected\n");

                    splitters[idx] = probes[i];

                    prefixCounts[idx] = globalHist[i];

                    selected[idx] = true;

                }

            }

            splittersLeft = 0;

            for (int i = 0; i < numSplitters; i++)

                if (!selected[i]) splittersLeft++;

            if (splittersLeft) {

                // if splitters are left unsatisfied, get new set of probes

                int a = -1, probeIdx = 0;

                bool running = false;

                for (int i = 0; i < numSplitters; i++) {

                    if (running) {

                        if (splitters[i] != -1) {

                            int numProbes = (i - a - 1) / splittersLeft * k;

                            dist_sort_t start = (a == -1) ? 0 : splitters[a];

                            dist_sort_t end = splitters[i];

                            for (int j = 0; j < numProbes && probeIdx < k;

                                 j++, probeIdx++) {

                                probes[probeIdx] = start + (end - start) /

                                                               (numProbes + 1) *

                                                               (j + 1);

                            }

                            running = false;

                            a = i;

                        }

                    } else {

                        if (splitters[i] != -1) {

                            a = i;

                        } else {

                            running = true;

                        }

                    }

                }

            }

        }

        MPI_Barrier(MPI_COMM_WORLD);

        // send unsatisfied splitters count to all ranks

        MPI_Bcast(&splittersLeft, 1, dtype, 0, MPI_COMM_WORLD);

    }

 

    if (0 == rank) {

        // printf("nloops: %d\n", nloops);

        // set counts for each bin

#pragma omp parallel for

        for (int i = numSplitters - 1; i > 0; i--) {

            counts[i] = prefixCounts[i] - prefixCounts[i - 1];

        }

        counts[0] = prefixCounts[0];

    }

 

    MPI_Barrier(MPI_COMM_WORLD);

    // send splitters to each rank

    MPI_Bcast(splitters, numSplitters, dtype, 0, MPI_COMM_WORLD);

 

    MPI_Barrier(MPI_COMM_WORLD);

    // send count to each rank

    MPI_Bcast(counts, numSplitters, dtype, 0, MPI_COMM_WORLD);

}

 

void moveData(const dist_sort_t *const sendData,

              const dist_sort_size_t sDataCount, dist_sort_t **recvData,

              dist_sort_size_t *rDataCount, const dist_sort_t *const splitters,

              const dist_sort_t *const counts, int numSplitters) {

    // Get number of processes

    int nProcs;

    MPI_Comm_size(MPI_COMM_WORLD, &nProcs);

 

    // Get rank of the process

    int rank;

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

 

    // number of local data points in each bin

    dist_sort_size_t bins[numSplitters], prefixBins[numSplitters];

#pragma omp parallel for

    for (int i = 0; i < numSplitters; i++)

        prefixBins[i] = getHist(sendData, sDataCount, splitters[i]);

 

#pragma omp parallel for

    for (int i = numSplitters - 1; i > 0; i--)

        bins[i] = prefixBins[i] - prefixBins[i - 1];

    bins[0] = prefixBins[0];

 

    // number of datapoints before this rank for each bin

    dist_sort_size_t myStartGlobal[numSplitters];

    MPI_Exscan(&bins, &myStartGlobal, numSplitters, dtype, MPI_SUM,

               MPI_COMM_WORLD);

    if (rank == 0)

#pragma omp parallel for

        for (int i = 0; i < numSplitters; i++) myStartGlobal[i] = 0;

 

    *recvData = (dist_sort_t *)malloc(counts[rank] * sizeof(dist_sort_t));

 

    *rDataCount = counts[rank];

 

    MPI_Win win;

 

    MPI_Win_create(*recvData, counts[rank] * sizeof(dist_sort_t),

                   sizeof(dist_sort_t), MPI_INFO_NULL, MPI_COMM_WORLD, &win);

 

    MPI_Win_fence(MPI_MODE_NOPRECEDE, win);

    uint64_t start = 0;

    for (int i = 0; i < numSplitters; i++) {

        uint64_t size = bins[i];

        // send data to respective rank

        MPI_Put(&sendData[start], size, dtype, i, myStartGlobal[i], size, dtype,

                win);

        start += bins[i];

    }

    MPI_Win_fence(0, win);

    MPI_Win_fence(MPI_MODE_NOSUCCEED, win);

}

 

void sort(dist_sort_t *data, dist_sort_size_t size) {

    // You are welcome to use this sort function.

    // If you wish to implement your own, you can do that too.

    // Don't use bubblesort.

    std::sort(data, data + size);

}

 

Place Order For A Top Grade Assignment Now

We have some amazing discount offers running for the students

Order Now

Get Help Instantly

    FREE FEATURES

    Limitless Amendments

    $09.50 free

    Bibliography

    $10.50 free

    Outline

    $05.00 free

    Title page

    $07.50 free

    Formatting

    $07.50 free

    Plagiarism Report

    $10.00 free

    Get all these features for $50.00

    free

    Let's Talk

    Enter your email, and we shall get back to you in an hour.