import java.awt.*; // for GUI support import java.awt.image.*; // for image construction import java.net.*; // for UDP stuff import jgl.*; // for all the cool stuff // get JGL free from http://www.objectspace.com import java.io.IOException; import java.io.PrintStream; // for System.out shortcut //========================================================================== // JobMaster (C) Oct 1996 L. Vanhelsuwe - All Rights Reserved // --------- // // This program is the server-side center of command for the WWW distributed // applet parallel processing system. // // The JobMaster's main task is to: // - wait for applets requesting a job on fixed UDP port JOB_REQ_UDP_PORT // - issue them with a job description (which tackles some small part of a // large problem (raytracing in this example) // - collect "result replies" from the applets on one of the several results // (reply) UDP ports // // HISTORY // ------- // 27-OCT-1996: First version // // Author email: lva@telework.demon.co.uk //========================================================================== public class JobMaster extends Frame implements Runnable, CommonConstants { final static String PROGRAM_TITLE = "Distributed Applet-based Parallel Processing Demo"; //-------------------------------------------------- // Number of applet reply ports used. For a strong return-flow of // results, this number should be increased via command line option final int MAX_PORTS = 100; private int numResultPorts = 3; // default, webmaster can override // Generic Parallel variables private int nextPort = 0; // next reply port for results private Thread mcpThread; // "master control program" thread private ResultsReceiver[] receivers; // slave threads // Parallel application-specific variables. // Size of UDP packets containing calculated scanlines final int RESULTS_PACKET_SIZE = 2 + IMAGE_WIDTH*3; private int scanlineY = 0; // scanline for next requesting applet // The completedScanlines array tracks which scanlines have already been // processed. Since this is a Java object, we can use it to synchronize // our code around it so that multiple ResultsReceiver threads don't screw // up our image bookkeeping. private boolean[] completedScanlines = new boolean[IMAGE_HEIGHT]; private int numberOfCompletedScanlines = 0; // AWT-related variables private Panel imagePanel; // the panel where the raytracing appears private Panel buttonPanel; private Image offScreenImg = null; // Image to accumulate raytracing in private Graphics offScreenImgG; private Graphics imagePanelG; // Misc. variables private static PrintStream o = System.out; // convenient shorthand //private boolean verbose = true; // debugging output on/off private boolean verbose = false; //------------------------------------------------------------------- // JobMaster server program entry point //------------------------------------------------------------------- public static void main (String[] args) { new JobMaster(args); // start JobMaster up.. as an object. } //------------------------------------------------------------------- // JobMaster constructor. // Create GUI and start main server thread. //------------------------------------------------------------------- public JobMaster(String[] args) { // print half-decent program banner o.println( center(80, repeatChar('-', PROGRAM_TITLE.length() ))); o.println( center(80, PROGRAM_TITLE )); o.println( center(80, repeatChar('-', PROGRAM_TITLE.length() ))); o.println("Written by Laurence Vanhelsuwe (lva@telework.demon.co.uk)"); o.print ("With raytracing code by Francisco In cio de Toledo Moraes"); o.println(" (chico@dcc.unicamp.br)"); o.println(""); o.println( center(80, COPYRIGHT)); o.println(""); o.println( center(80, "\"...only Java makes it possible.\"")); o.println(""); o.println(""); parseArgs(args); // args-args-args :-) // open Server-side control and progress window setTitle( PROGRAM_TITLE ); resize(IMAGE_WIDTH + 40, IMAGE_HEIGHT + 60); imagePanel = new Panel(); buttonPanel = new Panel(); buttonPanel.add(new Button("Reset")); buttonPanel.add(new Button("Quit")); add("South", buttonPanel); add("Center", imagePanel); show(); // opens window imagePanelG = imagePanel.getGraphics(); // get drawing context // create off-screen image to accumulate raytracing in. // this image is what paint() uses to refresh our window offScreenImg = createImage(IMAGE_WIDTH, IMAGE_HEIGHT); offScreenImgG = offScreenImg.getGraphics(); if ( offScreenImg == null || offScreenImgG == null ) { System.out.println("Failed to obtain off-screen image buffer."); System.exit(10); } mcpThread = new Thread(this); mcpThread.start(); // start JobMaster for real.. } //------------------------------------------------------------------- // JobMaster main thread entry point //------------------------------------------------------------------- public void run () { o.println("JobMaster deamon running."); // create N ResultsReceiver threads/UDP ports receivers = new ResultsReceiver[ numResultPorts ]; for(int threadNum=0; threadNum < numResultPorts; threadNum++) { o.println("Creating listening UDP port deamon # " + threadNum); receivers[threadNum] = new ResultsReceiver(threadNum, RESULTS_PACKET_SIZE, this); } o.println("Entering main JobMaster loop."); playJobMaster(); o.println("JobMaster DONE! (Image completely generated)."); } //------------------------------------------------------------------- // The JobMaster's main task is to wait for applets requesting a job. // Wait on a fixed port # for incoming requests. The request contains // the applet's address. // Reply with a job spec which contains a receiver UDP port # to // return the results to. //------------------------------------------------------------------- public void playJobMaster () { while ( numberOfCompletedScanlines < IMAGE_HEIGHT ) { processJobRequests(); } } //------------------------------------------------------------------- // Wait for a job request, then reply to the applet with a job spec. //------------------------------------------------------------------- private void processJobRequests() { byte[] jobReqPacket; // wait for a job request message from an applet "out there" if (verbose) o.println("Waiting for applet requesting a job.."); try { jobReqPacket = UDPUtils.receiveUDP(JOB_REQ_PACKET_LENGTH, JOB_REQ_UDP_PORT); } catch (SocketException e) { o.println("Error while receiving job request: " + e); return; } catch (UnknownHostException e) { o.println("Error while receiving job request: " + e); return; } catch (IOException e) { o.println("Error while receiving job request: " + e); return; } if (verbose) o.println("Received a job request. Parsing.."); // extract header from reply and check validity String readyHeader = new String (jobReqPacket, 0, HEADER_FIELD_POS, HEADER_FIELD_LENGTH); if ( ! readyHeader.equals("READY ")) { o.println("Received corrupt job request msg (bad header)."); return; // ignore message } // extract applet host address strings from job request String appletHostIP = ipToString( jobReqPacket, NUM_ADDR_FIELD_POS); String appletHostName = new String (jobReqPacket, 0, TXT_ADDR_FIELD_POS, TXT_ADDR_FIELD_LENGTH); appletHostName = appletHostName.trim(); o.println("Job (line " + scanlineY + ") issued to applet on host " + appletHostIP + " (" + appletHostName + ")" ); // construct the job spec message (contains scanline Y and reply # byte yHigh = (byte) (scanlineY >> 8); byte yLow = (byte) (scanlineY & 255); int portNum = RESULTS_BASE_UDP_PORT + nextPort; byte portHigh = (byte) ( portNum >> 8); byte portLow = (byte) ( portNum & 255); byte[] jobMsg = {portHigh, portLow, yHigh, yLow}; // send job message to waiting applet if (verbose) o.println("Issuing job spec now.."); try { UDPUtils.sendUDP(jobMsg, appletHostIP, APPLET_JOB_ACK_UDP_PORT); } catch (SocketException e) { o.println("Error while transmitting job: " + e); return; } catch (UnknownHostException e) { o.println("Error while transmitting job: " + e); return; } catch (IOException e) { o.println("Error while transmitting job: " + e); return; } // inc scanline and receiver port # for next applet that comes along // begging for more work.. do { // **!! WARNING DO-WHILE scanlineY++; scanlineY %= IMAGE_HEIGHT; } while (completedScanlines[scanlineY] && numberOfCompletedScanlines < IMAGE_HEIGHT ); // find one to do // Note condition: the counter could have been incremented // while inside the while, so we need to test for it too ! nextPort++; nextPort %= numResultPorts; } //------------------------------------------------------------------- // A ResultsReceiver thread invokes us to process some results. // If the received scanline is new, then add it to the off-screen image // and tell our window to refresh itself. //------------------------------------------------------------------- public void collateResults(byte[] results, int receiverID) { MemoryImageSource mis; int receivedScanlineY; // o.println("MCP adding results from receiver # " + receiverID); receivedScanlineY = (results[0] & 255) << 8; receivedScanlineY+= (results[1] & 255); // Since several threads can be calling this method at the same time, // and we've got a structure which could become inconsistent due to // this, we need to protect the structure from race conditions. // The array of booleans flagging completed scanlines is used as a lock // to protect it and the associated counter numberOfCompletedScanlines synchronized (completedScanlines) { if ( ! completedScanlines[ receivedScanlineY ] ) { int[] scanline = buildScanline( results ); mis = new MemoryImageSource(IMAGE_WIDTH, 1, scanline, 0, IMAGE_WIDTH); Image lineImg = createImage(mis); // o.println("lineImage = " + lineImg); offScreenImgG.drawImage(lineImg, 0, receivedScanlineY, this); offScreenImgG.setColor(Color.black); offScreenImgG.drawLine(0,receivedScanlineY, 10, receivedScanlineY); // track which and howmany scanlines have been done completedScanlines[ receivedScanlineY ] = true; numberOfCompletedScanlines++; repaint(); if (countFlags() != numberOfCompletedScanlines) { o.println("Assert fails !!!!!!!!!!!"); // **!!! } o.println("Received scanline Y: " + receivedScanlineY + " (" + (IMAGE_HEIGHT - numberOfCompletedScanlines) + " still to do)"); } else { o.println("Scanline " + receivedScanlineY + " already received.. ignoring."); } } // end of critical block } //------------------------------------------------------------------- //------------------------------------------------------------------- int countFlags() { int total = 0; for (int i=0; i< IMAGE_HEIGHT; i++) { if (completedScanlines[ i ] ) total++; } return total; } //------------------------------------------------------------------- // Converts a sequence of byte r,g,b triples into a line of 32-bit // wide ARGB pixels. //------------------------------------------------------------------- private int[] buildScanline( byte[] results ) { int[] scanline = new int[ IMAGE_WIDTH ]; int red, green, blue; int rgbIndex = 0; for (int pix=0; pix < IMAGE_WIDTH; pix++) { red = (results[2 + rgbIndex + 0]) & 255; green = (results[2 + rgbIndex + 1]) & 255; blue = (results[2 + rgbIndex + 2]) & 255; rgbIndex += 3; scanline[pix] = (255 << 24) | // alpha = 100% opaque (red << 16) | (green << 8) | blue; } return scanline; } //------------------------------------------------------------------- // This method is called by AWT when our window needs repainting. //------------------------------------------------------------------- public void paint(Graphics g) { System.out.println("painting.........."); if (offScreenImg != null) { imagePanelG.drawImage(offScreenImg, 0, 0, this); } } //------------------------------------------------------------------- // This method is called by AWT when AWT GUI elements are clicked on. //------------------------------------------------------------------- public boolean action (Event event, Object arg) { if (event.target instanceof Button) { if (arg.equals("Reset")) { resetCompleteJob(); } else if (arg.equals("Quit")) { // : System.exit(10); } } return true; } //------------------------------------------------------------------- // Restart the image from zero. // Means wiping scanline flags, counter and Y pointer. //------------------------------------------------------------------- private void resetCompleteJob() { System.out.println(""); System.out.println("Resetting picture.."); System.out.println(""); synchronized (completedScanlines) { for (int i=0; i < IMAGE_HEIGHT; i++) { completedScanlines[i] = false; } numberOfCompletedScanlines = 0; scanlineY = 0; offScreenImgG.clearRect(0,0, IMAGE_WIDTH, IMAGE_HEIGHT); } repaint(); } //------------------------------------------------------------------- // check command line options: // // NUMPORTS //------------------------------------------------------------------- private void parseArgs(String[] a) { ObjectArray args; String param; int value; args = new ObjectArray(a); if (args.contains("?") || args.contains("-h")) { o.println("Usage: JobMaster [NUMPORTS ]"); System.exit(10); } if (args.contains("NUMPORTS")) { try { param = (String) args.at( args.indexOf("NUMPORTS") + 1 ); value = Integer.parseInt(param); if (value > 0 && value < MAX_PORTS) { numResultPorts = value; } else throw new IndexOutOfBoundsException("valid range is 1.." + MAX_PORTS); } catch (IndexOutOfBoundsException toink) { o.println("Error: Invalid NUMPORTS option! " + toink); } } } //------------------------------------------------------------------- // Utility function: convert 4 consecutive bytes in an array of bytes // to an Internet "dot notation" IP address, e.g. "193.89.70.32" //------------------------------------------------------------------- private String ipToString(byte[] ipAddress, int offset) { StringBuffer s = new StringBuffer(); for (int i=0; i<4 ; i++) { s.append( Integer.toString( ipAddress[offset + i] & 255 ) ); if (i != 3) { s.append("."); // need those dots inbetween the bytes } } return s.toString(); } //------------------------------------------------------------------- // Utility function to center a string, space-padded //------------------------------------------------------------------- private String center (int columns, String s) { String spacePadding; int paddingAmount; paddingAmount = (columns - s.length()) / 2; spacePadding = repeatChar(' ', paddingAmount); return spacePadding + s + spacePadding; } //------------------------------------------------------------------- // Utility function to create a string of repeated characters. //------------------------------------------------------------------- private String repeatChar( char ch, int times) { StringBuffer sb = new StringBuffer(); for (int i=0; i