/*********************************************************************** * Ce programme emet a destination du port argv[1] sur la machine * argv[0], argv[3] messages (datagramme UDP) constitues chacun * de argv[2] caracteres. * * ex: java DgramSend linux03 2005 1000 10 * * L'emission se fait par send sur un DatagramSocket dans le domaine * INET. ***********************************************************************/ package robusttftp; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.FileChannel; import csc4509.SockUtil; /* * $Id: RobustTftpClientThread.java 29 2016-06-02 13:29:57Z conan $ */ public class RobustTftpClientThread implements Runnable{ private boolean debug = true; private long curPos, curAck, fileSize; private int type2HeaderSize; private RandomAccessFile fin; private FileChannel fc; private DatagramChannel ackChan, writeChan; private InetSocketAddress srvAddress; private ByteBuffer sendBuf, recvBuf; private byte [] bName; private String fileName; public RobustTftpClientThread(String fname, String host, int port) throws FileNotFoundException, SocketException, IOException{ InetAddress srvIpAddress; InetSocketAddress myAddress; DatagramSocket rcvSock; writeChan = DatagramChannel.open(); // on recupere l'adresse IP de la machine cible srvIpAddress = InetAddress.getByName(host); srvAddress = new InetSocketAddress(srvIpAddress,port); fileName = fname; ackChan = DatagramChannel.open(); rcvSock = ackChan.socket(); myAddress = new InetSocketAddress(port+5); rcvSock.bind(myAddress); curPos = 0L; curAck = -1L; fileSize = 0L; try { fin = new RandomAccessFile(fileName,"r"); fc = fin.getChannel(); fileSize = fc.size(); } catch (IOException ioe){ System.out.println(ioe); } sendBuf = ByteBuffer.allocate(2048); recvBuf = ByteBuffer.allocate(2048); // calculate size of the Header // header is composed of : one int to describe the number of bytes necessary for the name // the array of bytes for the file name, one long for the starting point and the size of the data (1024) bName = fname.getBytes(); type2HeaderSize = 3*Integer.SIZE/Byte.SIZE + bName.length + Long.SIZE/Byte.SIZE; } public void run(){ //receive acks InetSocketAddress clientAddress=null; int ackCount=0; System.out.println("Entering Thread waiting for acks"); do{ recvBuf.clear(); try { clientAddress = (InetSocketAddress) ackChan.receive(recvBuf); if(clientAddress != null){ if(debug){ System.out.println("Received "+recvBuf.position() + " data in msg "+ ackCount++ + " from "); SockUtil.printDebug(clientAddress); } recvBuf.flip(); int type = recvBuf.getInt(); if(debug){ System.out.println("Message type"+ type); } if(type == 3){ //ack int length = recvBuf.getInt(); if(debug){ System.out.println("Name length, bName.length "+ length +", "+bName.length); } if(length == bName.length){ byte [] remoteBname = new byte [length]; recvBuf.get(remoteBname); String remoteName = new String(remoteBname); if(remoteName.equals(fileName)){ long remoteAck = recvBuf.getLong(); if(debug){ System.out.println("Remote Ack "+ remoteAck); } if(remoteAck > curAck){ // better ack curAck = remoteAck; } if(remoteAck == fileSize){ // finished break; } if(remoteAck == curAck){ System.out.println("Remote Ack == cur Ack modify curPos to "+ remoteAck); curPos = curAck; } } } } } }catch(IOException ioe){ System.out.println(clientAddress); break; } }while(true); } public void sendFileWithAcks(){ int count = 0; long sent; sendBuf.putInt(1); sendBuf.putInt(bName.length); sendBuf.put(bName); sendBuf.putLong(fileSize); sendBuf.flip(); try{ sent = writeChan.send(sendBuf,srvAddress); Thread.yield(); if(curAck == -1L){ // not received ack resend sendBuf.rewind(); writeChan.send(sendBuf,srvAddress); } }catch(IOException ioe){ System.out.println(ioe); return; } do { if(debug){ System.out.println("curPos = "+ curPos + " curAck "+curAck); } sendBuf.clear(); try { if(curPos > curAck){ // give a chance to receive an ack try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } // header + sendBuf.putInt(2); sendBuf.putInt(bName.length); sendBuf.put(bName); sendBuf.putLong(curPos); int msgSize = ((sendBuf.capacity()-type2HeaderSize) < (int)(fileSize-curPos)) ?sendBuf.capacity()-type2HeaderSize :(int)(fileSize-curPos); sendBuf.putInt(msgSize); // curPos is modified by the ack receiver part to the last received ack position fin.seek(curPos); count = fc.read(sendBuf); sendBuf.flip(); sent = writeChan.send(sendBuf,srvAddress); } catch(IOException ioe){ System.out.println(ioe); break; } System.out.println("Sending a datagram of size " + sent); curPos += count; } while (curAck < fileSize); } }