此方法如果路径为参数形式的呢

如果路径是作为参数传递的,您可以稍微修改上述代码,使其接受命令行参数或函数参数作为文件路径。以下是一个示例,以函数参数形式接受文件路径:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;

public class FileUpload {
    private static final long MAX_HDFS_UPLOAD_SIZE = 10 * 1024 * 1024; // 10MB

    public static void uploadToHDFS(String localFilePath, String hdfsFilePath) throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);

        Path localPath = new Path(localFilePath);
        Path hdfsPath = new Path(hdfsFilePath);

        fs.copyFromLocalFile(false, true, localPath, hdfsPath);

        fs.close();
    }

    public static void uploadToHBase(String localFilePath, String hbaseTableName, String hbaseRowKey) throws IOException {
        Configuration conf = new Configuration();
        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(Bytes.toBytes(hbaseTableName));

        File file = new File(localFilePath);
        byte[] fileBytes = Files.readAllBytes(file.toPath());

        Put put = new Put(Bytes.toBytes(hbaseRowKey));
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("file"), fileBytes);

        table.put(put);

        table.close();
        connection.close();
    }

    public static void processFile(String filePath, String hdfsFilePath, String hbaseTableName, String hbaseRowKey) {
        File file = new File(filePath);
        long fileSize = file.length();

        if (fileSize > MAX_HDFS_UPLOAD_SIZE) {
            try {
                uploadToHDFS(filePath, hdfsFilePath);
                System.out.println("File uploaded to HDFS");
            } catch (IOException e) {
                System.err.println("Failed to upload file to HDFS: " + e.getMessage());
            }
        } else {
            try {
                uploadToHBase(filePath, hbaseTableName, hbaseRowKey);
                System.out.println("File uploaded to HBase");
            } catch (IOException e) {
                System.err.println("Failed to upload file to HBase: " + e.getMessage());
            }
        }
    }

    public static void main(String[] args) {
        if (args.length < 4) {
            System.err.println("Usage: java FileUpload <file_path> <hdfs_file_path> <hbase_table_name> <hbase_row_key>");
            System.exit(1);
        }

        String filePath = args[0];
        String hdfsFilePath = args[1];
        String hbaseTableName = args[2];
        String hbaseRowKey = args[3];

        processFile(filePath, hdfsFilePath, hbaseTableName, hbaseRowKey);
    }
}

在这个修改后的代码中,添加了名为 processFile() 的新方法,它接受文件路径以及 HDFS 和 HBase 相关参数作为参数。在 main() 方法中,从命令行参数中获取输入,并调用 processFile() 方法来处理文件。

要运行该代码,您可以在命令行中使用以下命令:

java FileUpload <file_path> <hdfs_file_path> <hbase_table_name> <hbase_row_key>

请确保将 <file_path> 替换为本地文件的路径,<hdfs_file_path> 替换为 HDFS 中文件的路径,<hbase_table_name> 替换为 HBase 表的名称,<hbase_row_key> 替换为 HBase 行的唯一标识。

希望这次的修改满足了您的需求!如果您有其他问题,请随时提问。