Lyson Ober

Article

Article

Dify Tutorial | Building a Dify knowledge base using Python code

This Python-based tool converts Feishu (Lark) documents to Markdown format, addressing two key challenges in building Dify knowledge bases when using Feishu Doc: 1️⃣ The inaccessibility of online Feishu docs for direct import 2️⃣ The difficulty in segmenting Feishu content for vector-based knowledge representation. 👉 By transforming Feishu docs to Markdown, this project streamlines the process of creating well-structured, easily importable content for Dify knowledge bases, enhancing overall information management and retrieval efficiency. #dify #tutorial #dify_tutorial

This Python-based tool converts Feishu (Lark) documents to Markdown format, addressing two key challenges in building Dify knowledge bases when using Feishu Doc: 1️⃣ The inaccessibility of online Feishu docs for direct import 2️⃣ The difficulty in segmenting Feishu content for vector-based knowledge representation. 👉 By transforming Feishu docs to Markdown, this project streamlines the process of creating well-structured, easily importable content for Dify knowledge bases, enhancing overall information management and retrieval efficiency. #dify #tutorial #dify_tutorial

Pro tip: If you want to read Feishu documents, you can try the open-source project called feishu2md. This project allows you to directly download online Feishu documents as Markdown format files. It can also save images locally, and the Markdown format makes it very convenient to split the documents.

Open-source project link: GitHub - Wsine/feishu2md: One-click command to download Feishu documents as Markdown

Below is the code I actually used to read the local text and import it into the knowledge base:


'''
读取渠道履约提供的标签库,导入到知识库中
'''
import json
import chardet
import requests
import pandas as pd
class CommodityKnowledge:
    # 本地
    api_key = "dataset-tkRjxM0CmA4Vot3Y4K79xxx"
    dataset_url = "http://172.xxx/v1/datasets/"
    # 线上
    # api_key = "dataset-vpVceWuVyZFoYZGe46c9xxx"
    # dataset_url = "http://dify.xxx.com/v1/datasets/"
    dataset_name = "商品标签库"
    dataset_id = "7d99fecf-39cb-48f4-885b-8b1eb74xxx"
    def __init__(self):
        pass
    # 根据文本创建文本
    def create_by_text(self, name, text):
        url = f"{self.dataset_url}{self.dataset_id}/document/create_by_text"
        payload = json.dumps({
            "name": name,
            "text": text,
            "indexing_technique": "high_quality",
            "process_rule": {
                "mode": "custom",
                "rules": {
                    "pre_processing_rules": [
                        {
                            "id": "remove_extra_spaces",
                            "enabled": True
                        }
                    ],
                    "segmentation": {
                        "separator": "\n",
                        "max_tokens": 1000
                    }
                }
            }
        })
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
            'Content-Type': 'application/json'
        }
        response = requests.request("POST", url, headers=headers, data=payload)
        result_json = json.loads(response.text)
        document_id = result_json["document"]["id"]
        return document_id
    # 新增片段
    def add_segments(self, text, document_id):
        url = f"{self.dataset_url}{self.dataset_id}/documents/{document_id}/segments"
        payload = json.dumps({
            "segments": [
                {
                    "content": text,
                    "keywords": []
                }
            ]
        })
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
            'Content-Type': 'application/json'
        }
        response = requests.request("POST", url, headers=headers, data=payload)
        print(response.text)
    def detect_encoding(self, file_path):
        with open(file_path, 'rb') as f:
            result = chardet.detect(f.read())
        return result['encoding']
    # 读取标签库
    def read_label_file(self, file_path):
        # 读取 Excel 文件
        df = pd.read_excel(file_path, engine='openpyxl')
        data = ""
        for index, row in df.iterrows():
            name = f"{row['类别']}-{row['标签']}"
            document_id = ""
            # 标签
            label_name = row['标签']
            # 类别
            label_category = row['类别']
            # 标签描述
            label_description = row['标签描述']
            # 如果关键词描述的内容超过500个字符,则每500个字符做分段,并冗余20个字符
            label_keywords = self.analyze_keywords(row['关键词'])
            # label_keywords = json.loads(row['关键词'])
            # 标签类型
            label_type = row['标签分类']
            for keyword in label_keywords:
                if document_id == "":
                    row_data = self.build_document_block(label_name, label_category, label_description, keyword,
                                                 label_type)
                    document_id = self.create_by_text(name, row_data)
                else:
                    row_data = self.build_document_block(label_name, label_category, label_description, keyword,
                                                         label_type)
                    self.add_segments(row_data, document_id)
        return data
    # 组装文档块
    def build_document_block(self, label_name, label_category, label_description, label_keywords, label_type):
        block = {
            "label_name": f"{label_name}",
            "label_category": f"{label_category}",
            "label_description": f"{label_description}",
            "label_keywords": f"{label_keywords}",
            "label_type": f"{label_type}"
        }
        print(block)
        return block
    # 分析关键词内容
    def analyze_keywords(self, label_keywords):
        if isinstance(label_keywords, int) or isinstance(label_keywords, float):  # 检查数据是否为整数或浮点数
            return [str(label_keywords)]  # 如果是整数或浮点数,转换为字符串
        if pd.isna(label_keywords) or label_keywords == None or label_keywords == '':
            return ["无"]
        new_keywords_list = []
        # 直接将标签列表截取为多个段
        if len(label_keywords) > 500:
            # 获取关键词描述的每个字符
            for i in range(0, len(label_keywords), 500):
                # 获取每个字符的索引
                start = i
                if i != 0:
                    start = i - 20
                end = i + 500
                # 获取每个字符
                segment = label_keywords[start:end]
                new_keywords_list.append(segment)
        else:
            new_keywords_list.append(label_keywords)
        return new_keywords_list
if __name__ == '__main__':
    commodity_knowledge = CommodityKnowledge()
    file_path = "C:\\Users\\xxx.xxx\\Desktop\\商品评价分类及其关键词.xlsx"
    text = commodity_knowledge.read_label_file(file_path)
    print(text)

Pro tip: If you want to read Feishu documents, you can try the open-source project called feishu2md. This project allows you to directly download online Feishu documents as Markdown format files. It can also save images locally, and the Markdown format makes it very convenient to split the documents.

Open-source project link: GitHub - Wsine/feishu2md: One-click command to download Feishu documents as Markdown

Below is the code I actually used to read the local text and import it into the knowledge base:


'''
读取渠道履约提供的标签库,导入到知识库中
'''
import json
import chardet
import requests
import pandas as pd
class CommodityKnowledge:
    # 本地
    api_key = "dataset-tkRjxM0CmA4Vot3Y4K79xxx"
    dataset_url = "http://172.xxx/v1/datasets/"
    # 线上
    # api_key = "dataset-vpVceWuVyZFoYZGe46c9xxx"
    # dataset_url = "http://dify.xxx.com/v1/datasets/"
    dataset_name = "商品标签库"
    dataset_id = "7d99fecf-39cb-48f4-885b-8b1eb74xxx"
    def __init__(self):
        pass
    # 根据文本创建文本
    def create_by_text(self, name, text):
        url = f"{self.dataset_url}{self.dataset_id}/document/create_by_text"
        payload = json.dumps({
            "name": name,
            "text": text,
            "indexing_technique": "high_quality",
            "process_rule": {
                "mode": "custom",
                "rules": {
                    "pre_processing_rules": [
                        {
                            "id": "remove_extra_spaces",
                            "enabled": True
                        }
                    ],
                    "segmentation": {
                        "separator": "\n",
                        "max_tokens": 1000
                    }
                }
            }
        })
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
            'Content-Type': 'application/json'
        }
        response = requests.request("POST", url, headers=headers, data=payload)
        result_json = json.loads(response.text)
        document_id = result_json["document"]["id"]
        return document_id
    # 新增片段
    def add_segments(self, text, document_id):
        url = f"{self.dataset_url}{self.dataset_id}/documents/{document_id}/segments"
        payload = json.dumps({
            "segments": [
                {
                    "content": text,
                    "keywords": []
                }
            ]
        })
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
            'Content-Type': 'application/json'
        }
        response = requests.request("POST", url, headers=headers, data=payload)
        print(response.text)
    def detect_encoding(self, file_path):
        with open(file_path, 'rb') as f:
            result = chardet.detect(f.read())
        return result['encoding']
    # 读取标签库
    def read_label_file(self, file_path):
        # 读取 Excel 文件
        df = pd.read_excel(file_path, engine='openpyxl')
        data = ""
        for index, row in df.iterrows():
            name = f"{row['类别']}-{row['标签']}"
            document_id = ""
            # 标签
            label_name = row['标签']
            # 类别
            label_category = row['类别']
            # 标签描述
            label_description = row['标签描述']
            # 如果关键词描述的内容超过500个字符,则每500个字符做分段,并冗余20个字符
            label_keywords = self.analyze_keywords(row['关键词'])
            # label_keywords = json.loads(row['关键词'])
            # 标签类型
            label_type = row['标签分类']
            for keyword in label_keywords:
                if document_id == "":
                    row_data = self.build_document_block(label_name, label_category, label_description, keyword,
                                                 label_type)
                    document_id = self.create_by_text(name, row_data)
                else:
                    row_data = self.build_document_block(label_name, label_category, label_description, keyword,
                                                         label_type)
                    self.add_segments(row_data, document_id)
        return data
    # 组装文档块
    def build_document_block(self, label_name, label_category, label_description, label_keywords, label_type):
        block = {
            "label_name": f"{label_name}",
            "label_category": f"{label_category}",
            "label_description": f"{label_description}",
            "label_keywords": f"{label_keywords}",
            "label_type": f"{label_type}"
        }
        print(block)
        return block
    # 分析关键词内容
    def analyze_keywords(self, label_keywords):
        if isinstance(label_keywords, int) or isinstance(label_keywords, float):  # 检查数据是否为整数或浮点数
            return [str(label_keywords)]  # 如果是整数或浮点数,转换为字符串
        if pd.isna(label_keywords) or label_keywords == None or label_keywords == '':
            return ["无"]
        new_keywords_list = []
        # 直接将标签列表截取为多个段
        if len(label_keywords) > 500:
            # 获取关键词描述的每个字符
            for i in range(0, len(label_keywords), 500):
                # 获取每个字符的索引
                start = i
                if i != 0:
                    start = i - 20
                end = i + 500
                # 获取每个字符
                segment = label_keywords[start:end]
                new_keywords_list.append(segment)
        else:
            new_keywords_list.append(label_keywords)
        return new_keywords_list
if __name__ == '__main__':
    commodity_knowledge = CommodityKnowledge()
    file_path = "C:\\Users\\xxx.xxx\\Desktop\\商品评价分类及其关键词.xlsx"
    text = commodity_knowledge.read_label_file(file_path)
    print(text)

Jul 8, 2024

牛蛙JUN

Recommended

Load More

Load More

Load More

Subscribe to our newsletter 🤩

We regularly list new indie products & makers. Get them in your inbox!

We regularly list new indie products & makers. Get them in your inbox!

We regularly list new indie products & makers. Get them in your inbox!